diff --git a/.github/workflows/daily_scan.yml b/.github/workflows/daily_scan.yml index 30b9476fe..10ee2ce4a 100644 --- a/.github/workflows/daily_scan.yml +++ b/.github/workflows/daily_scan.yml @@ -82,7 +82,7 @@ jobs: id: high_scan uses: ./.github/actions/image_scan with: - image-ref: "public.ecr.aws/aws-observability/adot-autoinstrumentation-python:v0.10.1" + image-ref: "public.ecr.aws/aws-observability/adot-autoinstrumentation-python:v0.10.0" severity: 'CRITICAL,HIGH' - name: Perform low image scan @@ -90,7 +90,7 @@ jobs: id: low_scan uses: ./.github/actions/image_scan with: - image-ref: "public.ecr.aws/aws-observability/adot-autoinstrumentation-python:v0.10.1" + image-ref: "public.ecr.aws/aws-observability/adot-autoinstrumentation-python:v0.10.0" severity: 'MEDIUM,LOW,UNKNOWN' - name: Configure AWS Credentials for emitting metrics diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index 3f16e5dca..b89b0bcb1 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -23,7 +23,6 @@ AwsMetricAttributesSpanExporterBuilder, ) from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder -from amazon.opentelemetry.distro.exporter.console.logs.compact_console_log_exporter import CompactConsoleLogExporter from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader @@ -47,7 +46,7 @@ ) from opentelemetry.sdk._events import EventLoggerProvider from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter, LogExporter +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, OTEL_EXPORTER_OTLP_METRICS_PROTOCOL, @@ -123,14 +122,7 @@ class OtlpLogHeaderSetting(NamedTuple): log_group: Optional[str] log_stream: Optional[str] namespace: Optional[str] - - def is_valid(self) -> bool: - """Check if the log header setting is valid by ensuring both log_group and log_stream are present.""" - return self.log_group is not None and self.log_stream is not None - - -# Singleton cache for OtlpLogHeaderSetting -_otlp_log_header_setting_cache: Optional[OtlpLogHeaderSetting] = None + is_valid: bool class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator): @@ -217,9 +209,6 @@ def _init_logging( set_logger_provider(provider) for _, exporter_class in exporters.items(): - if exporter_class is ConsoleLogExporter and _is_lambda_environment(): - exporter_class = CompactConsoleLogExporter - _logger.debug("Lambda environment detected, using CompactConsoleLogExporter instead of ConsoleLogExporter") exporter_args = {} _customize_log_record_processor( logger_provider=provider, log_exporter=_customize_logs_exporter(exporter_class(**exporter_args)) @@ -451,7 +440,7 @@ def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter: if isinstance(log_exporter, OTLPLogExporter): - if _fetch_logs_header().is_valid(): + if _validate_and_fetch_logs_header().is_valid: endpoint, region = _extract_endpoint_and_region_from_otlp_endpoint(logs_endpoint) # Setting default compression mode to Gzip as this is the behavior in upstream's # collector otlp http exporter: @@ -638,23 +627,18 @@ def _extract_endpoint_and_region_from_otlp_endpoint(endpoint: str): return endpoint, region -def _fetch_logs_header() -> OtlpLogHeaderSetting: - """Returns the OTLP log header setting as a singleton instance.""" - global _otlp_log_header_setting_cache # pylint: disable=global-statement - - if _otlp_log_header_setting_cache is not None: - return _otlp_log_header_setting_cache +def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting: + """Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to + AWS OTLP Logs endpoint.""" logs_headers = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_HEADERS) if not logs_headers: - if not _is_lambda_environment(): - _logger.warning( - "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS " - "to include x-aws-log-group and x-aws-log-stream" - ) - _otlp_log_header_setting_cache = OtlpLogHeaderSetting(None, None, None) - return _otlp_log_header_setting_cache + _logger.warning( + "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS " + "to include x-aws-log-group and x-aws-log-stream" + ) + return OtlpLogHeaderSetting(None, None, None, False) log_group = None log_stream = None @@ -672,14 +656,9 @@ def _fetch_logs_header() -> OtlpLogHeaderSetting: elif key == AWS_EMF_METRICS_NAMESPACE and value: namespace = value - _otlp_log_header_setting_cache = OtlpLogHeaderSetting(log_group, log_stream, namespace) - return _otlp_log_header_setting_cache - + is_valid = log_group is not None and log_stream is not None -def _clear_logs_header_cache(): - """Clear the singleton cache for OtlpLogHeaderSetting. Used primarily for testing.""" - global _otlp_log_header_setting_cache # pylint: disable=global-statement - _otlp_log_header_setting_cache = None + return OtlpLogHeaderSetting(log_group, log_stream, namespace, is_valid) def _get_metric_export_interval(): @@ -794,25 +773,8 @@ def _check_emf_exporter_enabled() -> bool: def _create_emf_exporter(): - """ - Create the appropriate EMF exporter based on the environment and configuration. - - Returns: - ConsoleEmfExporter for Lambda without log headers log group and stream - AwsCloudWatchEmfExporter for other cases (when conditions are met) - None if CloudWatch exporter cannot be created - """ + """Create and configure the CloudWatch EMF exporter.""" try: - log_header_setting = _fetch_logs_header() - - # Lambda without valid logs http headers - use Console EMF exporter - if _is_lambda_environment() and not log_header_setting.is_valid(): - # pylint: disable=import-outside-toplevel - from amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter import ConsoleEmfExporter - - return ConsoleEmfExporter(namespace=log_header_setting.namespace) - - # For non-Lambda environment or Lambda with valid headers - use CloudWatch EMF exporter session = get_aws_session() # Check if botocore is available before importing the EMF exporter if not session: @@ -824,7 +786,9 @@ def _create_emf_exporter(): AwsCloudWatchEmfExporter, ) - if not log_header_setting.is_valid(): + log_header_setting = _validate_and_fetch_logs_header() + + if not log_header_setting.is_valid: return None return AwsCloudWatchEmfExporter( diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py index 487272024..35a41780d 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py @@ -3,7 +3,7 @@ import importlib import os import sys -from logging import ERROR, Logger, getLogger +from logging import Logger, getLogger from amazon.opentelemetry.distro._utils import get_aws_region, is_agent_observability_enabled from amazon.opentelemetry.distro.aws_opentelemetry_configurator import ( @@ -22,17 +22,12 @@ from opentelemetry import propagate from opentelemetry.distro import OpenTelemetryDistro from opentelemetry.environment_variables import OTEL_PROPAGATORS, OTEL_PYTHON_ID_GENERATOR -from opentelemetry.instrumentation.auto_instrumentation import _load -from opentelemetry.instrumentation.logging import LEVELS -from opentelemetry.instrumentation.logging.environment_variables import OTEL_PYTHON_LOG_LEVEL from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION, OTEL_EXPORTER_OTLP_PROTOCOL, ) _logger: Logger = getLogger(__name__) -# Suppress configurator warnings from auto-instrumentation -_load._logger.setLevel(LEVELS.get(os.environ.get(OTEL_PYTHON_LOG_LEVEL, "error").lower(), ERROR)) class AwsOpenTelemetryDistro(OpenTelemetryDistro): diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py index 6603391a1..168ac7df1 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py @@ -3,18 +3,66 @@ # pylint: disable=no-self-use +import json import logging -from typing import Any, Dict, Optional +import math +import time +from collections import defaultdict +from typing import Any, Dict, List, Optional, Tuple -from opentelemetry.sdk.metrics.export import AggregationTemporality +from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics import Histogram as HistogramInstr +from opentelemetry.sdk.metrics import ObservableCounter, ObservableGauge, ObservableUpDownCounter, UpDownCounter +from opentelemetry.sdk.metrics._internal.point import Metric +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + ExponentialHistogram, + Gauge, + Histogram, + MetricExporter, + MetricExportResult, + MetricsData, + NumberDataPoint, + Sum, +) +from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation +from opentelemetry.sdk.resources import Resource +from opentelemetry.util.types import Attributes from ._cloudwatch_log_client import CloudWatchLogClient -from .base_emf_exporter import BaseEmfExporter logger = logging.getLogger(__name__) -class AwsCloudWatchEmfExporter(BaseEmfExporter): +class MetricRecord: + """The metric data unified representation of all OTel metrics for OTel to CW EMF conversion.""" + + def __init__(self, metric_name: str, metric_unit: str, metric_description: str): + """ + Initialize metric record. + + Args: + metric_name: Name of the metric + metric_unit: Unit of the metric + metric_description: Description of the metric + """ + # Instrument metadata + self.name = metric_name + self.unit = metric_unit + self.description = metric_description + + # Will be set by conversion methods + self.timestamp: Optional[int] = None + self.attributes: Attributes = {} + + # Different metric type data - only one will be set per record + self.value: Optional[float] = None + self.sum_data: Optional[Any] = None + self.histogram_data: Optional[Any] = None + self.exp_histogram_data: Optional[Any] = None + + +class AwsCloudWatchEmfExporter(MetricExporter): """ OpenTelemetry metrics exporter for CloudWatch EMF format. @@ -26,6 +74,50 @@ class AwsCloudWatchEmfExporter(BaseEmfExporter): """ + # CloudWatch EMF supported units + # Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html + EMF_SUPPORTED_UNITS = { + "Seconds", + "Microseconds", + "Milliseconds", + "Bytes", + "Kilobytes", + "Megabytes", + "Gigabytes", + "Terabytes", + "Bits", + "Kilobits", + "Megabits", + "Gigabits", + "Terabits", + "Percent", + "Count", + "Bytes/Second", + "Kilobytes/Second", + "Megabytes/Second", + "Gigabytes/Second", + "Terabytes/Second", + "Bits/Second", + "Kilobits/Second", + "Megabits/Second", + "Gigabits/Second", + "Terabits/Second", + "Count/Second", + "None", + } + + # OTel to CloudWatch unit mapping + # Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188 + UNIT_MAPPING = { + "1": "", + "ns": "", + "ms": "Milliseconds", + "s": "Seconds", + "us": "Microseconds", + "By": "Bytes", + "bit": "Bits", + } + def __init__( self, namespace: str = "default", @@ -48,8 +140,26 @@ def __init__( preferred_aggregation: Optional dictionary mapping instrument types to preferred aggregation **kwargs: Additional arguments passed to botocore client """ - super().__init__(namespace, preferred_temporality, preferred_aggregation) + # Set up temporality preference default to DELTA if customers not set + if preferred_temporality is None: + preferred_temporality = { + Counter: AggregationTemporality.DELTA, + HistogramInstr: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableGauge: AggregationTemporality.DELTA, + ObservableUpDownCounter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.DELTA, + } + + # Set up aggregation preference default to exponential histogram for histogram metrics + if preferred_aggregation is None: + preferred_aggregation = { + HistogramInstr: ExponentialBucketHistogramAggregation(), + } + + super().__init__(preferred_temporality, preferred_aggregation) + self.namespace = namespace self.log_group_name = log_group_name # Initialize CloudWatch Logs client @@ -57,7 +167,359 @@ def __init__( log_group_name=log_group_name, log_stream_name=log_stream_name, aws_region=aws_region, **kwargs ) - def _export(self, log_event: Dict[str, Any]): + def _get_metric_name(self, record: MetricRecord) -> Optional[str]: + """Get the metric name from the metric record or data point.""" + + try: + if record.name: + return record.name + except AttributeError: + pass + # Return None if no valid metric name found + return None + + def _get_unit(self, record: MetricRecord) -> Optional[str]: + """Get CloudWatch unit from MetricRecord unit.""" + unit = record.unit + + if not unit: + return None + + # First check if unit is already a supported EMF unit + if unit in self.EMF_SUPPORTED_UNITS: + return unit + + # Map from OTel unit to CloudWatch unit + mapped_unit = self.UNIT_MAPPING.get(unit) + + return mapped_unit + + def _get_dimension_names(self, attributes: Attributes) -> List[str]: + """Extract dimension names from attributes.""" + # Implement dimension selection logic + # For now, use all attributes as dimensions + return list(attributes.keys()) + + def _get_attributes_key(self, attributes: Attributes) -> str: + """ + Create a hashable key from attributes for grouping metrics. + + Args: + attributes: The attributes dictionary + + Returns: + A string representation of sorted attributes key-value pairs + """ + # Sort the attributes to ensure consistent keys + sorted_attrs = sorted(attributes.items()) + # Create a string representation of the attributes + return str(sorted_attrs) + + def _normalize_timestamp(self, timestamp_ns: int) -> int: + """ + Normalize a nanosecond timestamp to milliseconds for CloudWatch. + + Args: + timestamp_ns: Timestamp in nanoseconds + + Returns: + Timestamp in milliseconds + """ + # Convert from nanoseconds to milliseconds + return timestamp_ns // 1_000_000 + + def _create_metric_record(self, metric_name: str, metric_unit: str, metric_description: str) -> MetricRecord: + """ + Creates the intermediate metric data structure that standardizes different otel metric representation + and will be used to generate EMF events. The base record + establishes the instrument schema (name/unit/description) that will be populated + with dimensions, timestamps, and values during metric processing. + + Args: + metric_name: Name of the metric + metric_unit: Unit of the metric + metric_description: Description of the metric + + Returns: + A MetricRecord object + """ + return MetricRecord(metric_name, metric_unit, metric_description) + + def _convert_gauge_and_sum(self, metric: Metric, data_point: NumberDataPoint) -> MetricRecord: + """Convert a Gauge or Sum metric datapoint to a metric record. + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and value + """ + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + record.timestamp = timestamp_ms + + # Set attributes + record.attributes = data_point.attributes + + # Set the value directly for both Gauge and Sum + record.value = data_point.value + + return record + + def _convert_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: + """Convert a Histogram metric datapoint to a metric record. + + https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/datapoint.go#L87 + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and histogram_data + """ + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + record.timestamp = timestamp_ms + + # Set attributes + record.attributes = data_point.attributes + + # For Histogram, set the histogram_data + record.histogram_data = { + "Count": data_point.count, + "Sum": data_point.sum, + "Min": data_point.min, + "Max": data_point.max, + } + return record + + # pylint: disable=too-many-locals + def _convert_exp_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: + """ + Convert an ExponentialHistogram metric datapoint to a metric record. + + This function follows the logic of CalculateDeltaDatapoints in the Go implementation, + converting exponential buckets to their midpoint values. + + Ref: + https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/22626 + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and exp_histogram_data + """ + + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + record.timestamp = timestamp_ms + + # Set attributes + record.attributes = data_point.attributes + + # Initialize arrays for values and counts + array_values = [] + array_counts = [] + + # Get scale + scale = data_point.scale + # Calculate base using the formula: 2^(2^(-scale)) + base = math.pow(2, math.pow(2, float(-scale))) + + # Process positive buckets + if data_point.positive and data_point.positive.bucket_counts: + positive_offset = getattr(data_point.positive, "offset", 0) + positive_bucket_counts = data_point.positive.bucket_counts + + bucket_begin = 0 + bucket_end = 0 + + for bucket_index, count in enumerate(positive_bucket_counts): + index = bucket_index + positive_offset + + if bucket_begin == 0: + bucket_begin = math.pow(base, float(index)) + else: + bucket_begin = bucket_end + + bucket_end = math.pow(base, float(index + 1)) + + # Calculate midpoint value of the bucket + metric_val = (bucket_begin + bucket_end) / 2 + + # Only include buckets with positive counts + if count > 0: + array_values.append(metric_val) + array_counts.append(float(count)) + + # Process zero bucket + zero_count = getattr(data_point, "zero_count", 0) + if zero_count > 0: + array_values.append(0) + array_counts.append(float(zero_count)) + + # Process negative buckets + if data_point.negative and data_point.negative.bucket_counts: + negative_offset = getattr(data_point.negative, "offset", 0) + negative_bucket_counts = data_point.negative.bucket_counts + + bucket_begin = 0 + bucket_end = 0 + + for bucket_index, count in enumerate(negative_bucket_counts): + index = bucket_index + negative_offset + + if bucket_end == 0: + bucket_end = -math.pow(base, float(index)) + else: + bucket_end = bucket_begin + + bucket_begin = -math.pow(base, float(index + 1)) + + # Calculate midpoint value of the bucket + metric_val = (bucket_begin + bucket_end) / 2 + + # Only include buckets with positive counts + if count > 0: + array_values.append(metric_val) + array_counts.append(float(count)) + + # Set the histogram data in the format expected by CloudWatch EMF + record.exp_histogram_data = { + "Values": array_values, + "Counts": array_counts, + "Count": data_point.count, + "Sum": data_point.sum, + "Max": data_point.max, + "Min": data_point.min, + } + + return record + + def _group_by_attributes_and_timestamp(self, record: MetricRecord) -> Tuple[str, int]: + """Group metric record by attributes and timestamp. + + Args: + record: The metric record + + Returns: + A tuple key for grouping + """ + # Create a key for grouping based on attributes + attrs_key = self._get_attributes_key(record.attributes) + return (attrs_key, record.timestamp) + + def _create_emf_log( + self, metric_records: List[MetricRecord], resource: Resource, timestamp: Optional[int] = None + ) -> Dict: + """ + Create EMF log dictionary from metric records. + + Since metric_records is already grouped by attributes, this function + creates a single EMF log for all records. + """ + # Start with base structure + emf_log = {"_aws": {"Timestamp": timestamp or int(time.time() * 1000), "CloudWatchMetrics": []}} + + # Set with latest EMF version schema + # opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/metric_translator.go#L414 + emf_log["Version"] = "1" + + # Add resource attributes to EMF log but not as dimensions + # OTel collector EMF Exporter has a resource_to_telemetry_conversion flag that will convert resource attributes + # as regular metric attributes(potential dimensions). However, for this SDK EMF implementation, + # we align with the OpenTelemetry concept that all metric attributes are treated as dimensions. + # And have resource attributes as just additional metadata in EMF, added otel.resource as prefix to distinguish. + if resource and resource.attributes: + for key, value in resource.attributes.items(): + emf_log[f"otel.resource.{key}"] = str(value) + + # Initialize collections for dimensions and metrics + metric_definitions = [] + # Collect attributes from all records (they should be the same for all records in the group) + # Only collect once from the first record and apply to all records + all_attributes = ( + metric_records[0].attributes + if metric_records and len(metric_records) > 0 and metric_records[0].attributes + else {} + ) + + # Process each metric record + for record in metric_records: + + metric_name = self._get_metric_name(record) + + # Skip processing if metric name is None or empty + if not metric_name: + continue + + # Create metric data dict + metric_data = {"Name": metric_name} + + unit = self._get_unit(record) + if unit: + metric_data["Unit"] = unit + + # Process different types of aggregations + if record.exp_histogram_data: + # Base2 Exponential Histogram + emf_log[metric_name] = record.exp_histogram_data + elif record.histogram_data: + # Regular Histogram metrics + emf_log[metric_name] = record.histogram_data + elif record.value is not None: + # Gauge, Sum, and other aggregations + emf_log[metric_name] = record.value + else: + logger.debug("Skipping metric %s as it does not have valid metric value", metric_name) + continue + + # Add to metric definitions list + metric_definitions.append(metric_data) + + # Get dimension names from collected attributes + dimension_names = self._get_dimension_names(all_attributes) + + # Add attribute values to the root of the EMF log + for name, value in all_attributes.items(): + emf_log[name] = str(value) + + # Add CloudWatch Metrics if we have metrics, include dimensions only if they exist + if metric_definitions: + cloudwatch_metric = {"Namespace": self.namespace, "Metrics": metric_definitions} + if dimension_names: + cloudwatch_metric["Dimensions"] = [dimension_names] + + emf_log["_aws"]["CloudWatchMetrics"].append(cloudwatch_metric) + + return emf_log + + def _send_log_event(self, log_event: Dict[str, Any]): """ Send a log event to CloudWatch Logs using the log client. @@ -66,6 +528,82 @@ def _export(self, log_event: Dict[str, Any]): """ self.log_client.send_log_event(log_event) + # pylint: disable=too-many-nested-blocks,unused-argument,too-many-branches + def export( + self, metrics_data: MetricsData, timeout_millis: Optional[int] = None, **_kwargs: Any + ) -> MetricExportResult: + """ + Export metrics as EMF logs to CloudWatch. + + Groups metrics by attributes and timestamp before creating EMF logs. + + Args: + metrics_data: MetricsData containing resource metrics and scope metrics + timeout_millis: Optional timeout in milliseconds + **kwargs: Additional keyword arguments + + Returns: + MetricExportResult indicating success or failure + """ + try: + if not metrics_data.resource_metrics: + return MetricExportResult.SUCCESS + + # Process all metrics from all resource metrics and scope metrics + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + # Dictionary to group metrics by attributes and timestamp + grouped_metrics = defaultdict(list) + + # Process all metrics in this scope + for metric in scope_metrics.metrics: + # Skip if metric.data is None or no data_points exists + try: + if not (metric.data and metric.data.data_points): + continue + except AttributeError: + # Metric doesn't have data or data_points attribute + continue + + # Process metrics based on type + metric_type = type(metric.data) + if metric_type in (Gauge, Sum): + for dp in metric.data.data_points: + record = self._convert_gauge_and_sum(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + elif metric_type == Histogram: + for dp in metric.data.data_points: + record = self._convert_histogram(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + elif metric_type == ExponentialHistogram: + for dp in metric.data.data_points: + record = self._convert_exp_histogram(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + else: + logger.debug("Unsupported Metric Type: %s", metric_type) + + # Now process each group separately to create one EMF log per group + for (_, timestamp_ms), metric_records in grouped_metrics.items(): + if not metric_records: + continue + + # Create and send EMF log for this batch of metrics + self._send_log_event( + { + "message": json.dumps( + self._create_emf_log(metric_records, resource_metrics.resource, timestamp_ms) + ), + "timestamp": timestamp_ms, + } + ) + + return MetricExportResult.SUCCESS + # pylint: disable=broad-exception-caught + # capture all types of exceptions to not interrupt the instrumented services + except Exception as error: + logger.error("Failed to export metrics: %s", error) + return MetricExportResult.FAILURE + def force_flush(self, timeout_millis: int = 10000) -> bool: # pylint: disable=unused-argument """ Force flush any pending metrics. diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/base_emf_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/base_emf_exporter.py deleted file mode 100644 index 48cb5481a..000000000 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/base_emf_exporter.py +++ /dev/null @@ -1,596 +0,0 @@ -# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -# SPDX-License-Identifier: Apache-2.0 - -# pylint: disable=no-self-use - -import json -import logging -import math -import time -from abc import ABC, abstractmethod -from collections import defaultdict -from typing import Any, Dict, List, Optional, Tuple - -from opentelemetry.sdk.metrics import Counter -from opentelemetry.sdk.metrics import Histogram as HistogramInstr -from opentelemetry.sdk.metrics import ObservableCounter, ObservableGauge, ObservableUpDownCounter, UpDownCounter -from opentelemetry.sdk.metrics._internal.point import Metric -from opentelemetry.sdk.metrics.export import ( - AggregationTemporality, - ExponentialHistogram, - Gauge, - Histogram, - MetricExporter, - MetricExportResult, - MetricsData, - NumberDataPoint, - Sum, -) -from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation -from opentelemetry.sdk.resources import Resource -from opentelemetry.util.types import Attributes - -logger = logging.getLogger(__name__) - - -class MetricRecord: - """The metric data unified representation of all OTel metrics for OTel to CW EMF conversion.""" - - def __init__(self, metric_name: str, metric_unit: str, metric_description: str): - """ - Initialize metric record. - - Args: - metric_name: Name of the metric - metric_unit: Unit of the metric - metric_description: Description of the metric - """ - # Instrument metadata - self.name = metric_name - self.unit = metric_unit - self.description = metric_description - - # Will be set by conversion methods - self.timestamp: Optional[int] = None - self.attributes: Attributes = {} - - # Different metric type data - only one will be set per record - self.value: Optional[float] = None - self.sum_data: Optional[Any] = None - self.histogram_data: Optional[Any] = None - self.exp_histogram_data: Optional[Any] = None - - -class BaseEmfExporter(MetricExporter, ABC): - """ - Base class for OpenTelemetry metrics exporters that convert to CloudWatch EMF format. - - This class contains all the common logic for converting OTel metrics into CloudWatch EMF logs. - Subclasses need to implement the _export method to define where the EMF logs are sent. - - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html - - """ - - # CloudWatch EMF supported units - # Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html - EMF_SUPPORTED_UNITS = { - "Seconds", - "Microseconds", - "Milliseconds", - "Bytes", - "Kilobytes", - "Megabytes", - "Gigabytes", - "Terabytes", - "Bits", - "Kilobits", - "Megabits", - "Gigabits", - "Terabits", - "Percent", - "Count", - "Bytes/Second", - "Kilobytes/Second", - "Megabytes/Second", - "Gigabytes/Second", - "Terabytes/Second", - "Bits/Second", - "Kilobits/Second", - "Megabits/Second", - "Gigabits/Second", - "Terabits/Second", - "Count/Second", - "None", - } - - # OTel to CloudWatch unit mapping - # Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188 - UNIT_MAPPING = { - "1": "", - "ns": "", - "ms": "Milliseconds", - "s": "Seconds", - "us": "Microseconds", - "By": "Bytes", - "bit": "Bits", - } - - def __init__( - self, - namespace: str = "default", - preferred_temporality: Optional[Dict[type, AggregationTemporality]] = None, - preferred_aggregation: Optional[Dict[type, Any]] = None, - ): - """ - Initialize the base EMF exporter. - - Args: - namespace: CloudWatch namespace for metrics - preferred_temporality: Optional dictionary mapping instrument types to aggregation temporality - preferred_aggregation: Optional dictionary mapping instrument types to preferred aggregation - """ - # Set up temporality preference default to DELTA if customers not set - if preferred_temporality is None: - preferred_temporality = { - Counter: AggregationTemporality.DELTA, - HistogramInstr: AggregationTemporality.DELTA, - ObservableCounter: AggregationTemporality.DELTA, - ObservableGauge: AggregationTemporality.DELTA, - ObservableUpDownCounter: AggregationTemporality.DELTA, - UpDownCounter: AggregationTemporality.DELTA, - } - - # Set up aggregation preference default to exponential histogram for histogram metrics - if preferred_aggregation is None: - preferred_aggregation = { - HistogramInstr: ExponentialBucketHistogramAggregation(), - } - - super().__init__(preferred_temporality, preferred_aggregation) - - self.namespace = namespace - - def _get_metric_name(self, record: MetricRecord) -> Optional[str]: - """Get the metric name from the metric record or data point.""" - - try: - if record.name: - return record.name - except AttributeError: - pass - # Return None if no valid metric name found - return None - - def _get_unit(self, record: MetricRecord) -> Optional[str]: - """Get CloudWatch unit from MetricRecord unit.""" - unit = record.unit - - if not unit: - return None - - # First check if unit is already a supported EMF unit - if unit in self.EMF_SUPPORTED_UNITS: - return unit - - # Map from OTel unit to CloudWatch unit - mapped_unit = self.UNIT_MAPPING.get(unit) - - return mapped_unit - - def _get_dimension_names(self, attributes: Attributes) -> List[str]: - """Extract dimension names from attributes.""" - # Implement dimension selection logic - # For now, use all attributes as dimensions - return list(attributes.keys()) - - def _get_attributes_key(self, attributes: Attributes) -> str: - """ - Create a hashable key from attributes for grouping metrics. - - Args: - attributes: The attributes dictionary - - Returns: - A string representation of sorted attributes key-value pairs - """ - # Sort the attributes to ensure consistent keys - sorted_attrs = sorted(attributes.items()) - # Create a string representation of the attributes - return str(sorted_attrs) - - def _normalize_timestamp(self, timestamp_ns: int) -> int: - """ - Normalize a nanosecond timestamp to milliseconds for CloudWatch. - - Args: - timestamp_ns: Timestamp in nanoseconds - - Returns: - Timestamp in milliseconds - """ - # Convert from nanoseconds to milliseconds - return timestamp_ns // 1_000_000 - - def _create_metric_record(self, metric_name: str, metric_unit: str, metric_description: str) -> MetricRecord: - """ - Creates the intermediate metric data structure that standardizes different otel metric representation - and will be used to generate EMF events. The base record - establishes the instrument schema (name/unit/description) that will be populated - with dimensions, timestamps, and values during metric processing. - - Args: - metric_name: Name of the metric - metric_unit: Unit of the metric - metric_description: Description of the metric - - Returns: - A MetricRecord object - """ - return MetricRecord(metric_name, metric_unit, metric_description) - - def _convert_gauge_and_sum(self, metric: Metric, data_point: NumberDataPoint) -> MetricRecord: - """Convert a Gauge or Sum metric datapoint to a metric record. - - Args: - metric: The metric object - data_point: The datapoint to convert - - Returns: - MetricRecord with populated timestamp, attributes, and value - """ - # Create base record - record = self._create_metric_record(metric.name, metric.unit, metric.description) - - # Set timestamp - timestamp_ms = ( - self._normalize_timestamp(data_point.time_unix_nano) - if data_point.time_unix_nano is not None - else int(time.time() * 1000) - ) - record.timestamp = timestamp_ms - - # Set attributes - record.attributes = data_point.attributes - - # Set the value directly for both Gauge and Sum - record.value = data_point.value - - return record - - def _convert_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: - """Convert a Histogram metric datapoint to a metric record. - - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/datapoint.go#L87 - - Args: - metric: The metric object - data_point: The datapoint to convert - - Returns: - MetricRecord with populated timestamp, attributes, and histogram_data - """ - # Create base record - record = self._create_metric_record(metric.name, metric.unit, metric.description) - - # Set timestamp - timestamp_ms = ( - self._normalize_timestamp(data_point.time_unix_nano) - if data_point.time_unix_nano is not None - else int(time.time() * 1000) - ) - record.timestamp = timestamp_ms - - # Set attributes - record.attributes = data_point.attributes - - # For Histogram, set the histogram_data - record.histogram_data = { - "Count": data_point.count, - "Sum": data_point.sum, - "Min": data_point.min, - "Max": data_point.max, - } - return record - - # pylint: disable=too-many-locals - def _convert_exp_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: - """ - Convert an ExponentialHistogram metric datapoint to a metric record. - - This function follows the logic of CalculateDeltaDatapoints in the Go implementation, - converting exponential buckets to their midpoint values. - - Ref: - https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/22626 - - Args: - metric: The metric object - data_point: The datapoint to convert - - Returns: - MetricRecord with populated timestamp, attributes, and exp_histogram_data - """ - - # Create base record - record = self._create_metric_record(metric.name, metric.unit, metric.description) - - # Set timestamp - timestamp_ms = ( - self._normalize_timestamp(data_point.time_unix_nano) - if data_point.time_unix_nano is not None - else int(time.time() * 1000) - ) - record.timestamp = timestamp_ms - - # Set attributes - record.attributes = data_point.attributes - - # Initialize arrays for values and counts - array_values = [] - array_counts = [] - - # Get scale - scale = data_point.scale - # Calculate base using the formula: 2^(2^(-scale)) - base = math.pow(2, math.pow(2, float(-scale))) - - # Process positive buckets - if data_point.positive and data_point.positive.bucket_counts: - positive_offset = getattr(data_point.positive, "offset", 0) - positive_bucket_counts = data_point.positive.bucket_counts - - bucket_begin = 0 - bucket_end = 0 - - for bucket_index, count in enumerate(positive_bucket_counts): - index = bucket_index + positive_offset - - if bucket_begin == 0: - bucket_begin = math.pow(base, float(index)) - else: - bucket_begin = bucket_end - - bucket_end = math.pow(base, float(index + 1)) - - # Calculate midpoint value of the bucket - metric_val = (bucket_begin + bucket_end) / 2 - - # Only include buckets with positive counts - if count > 0: - array_values.append(metric_val) - array_counts.append(float(count)) - - # Process zero bucket - zero_count = getattr(data_point, "zero_count", 0) - if zero_count > 0: - array_values.append(0) - array_counts.append(float(zero_count)) - - # Process negative buckets - if data_point.negative and data_point.negative.bucket_counts: - negative_offset = getattr(data_point.negative, "offset", 0) - negative_bucket_counts = data_point.negative.bucket_counts - - bucket_begin = 0 - bucket_end = 0 - - for bucket_index, count in enumerate(negative_bucket_counts): - index = bucket_index + negative_offset - - if bucket_end == 0: - bucket_end = -math.pow(base, float(index)) - else: - bucket_end = bucket_begin - - bucket_begin = -math.pow(base, float(index + 1)) - - # Calculate midpoint value of the bucket - metric_val = (bucket_begin + bucket_end) / 2 - - # Only include buckets with positive counts - if count > 0: - array_values.append(metric_val) - array_counts.append(float(count)) - - # Set the histogram data in the format expected by CloudWatch EMF - record.exp_histogram_data = { - "Values": array_values, - "Counts": array_counts, - "Count": data_point.count, - "Sum": data_point.sum, - "Max": data_point.max, - "Min": data_point.min, - } - - return record - - def _group_by_attributes_and_timestamp(self, record: MetricRecord) -> Tuple[str, int]: - """Group metric record by attributes and timestamp. - - Args: - record: The metric record - - Returns: - A tuple key for grouping - """ - # Create a key for grouping based on attributes - attrs_key = self._get_attributes_key(record.attributes) - return (attrs_key, record.timestamp) - - def _create_emf_log( - self, metric_records: List[MetricRecord], resource: Resource, timestamp: Optional[int] = None - ) -> Dict: - """ - Create EMF log dictionary from metric records. - - Since metric_records is already grouped by attributes, this function - creates a single EMF log for all records. - """ - # Start with base structure - emf_log = {"_aws": {"Timestamp": timestamp or int(time.time() * 1000), "CloudWatchMetrics": []}} - - # Set with latest EMF version schema - # opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/metric_translator.go#L414 - emf_log["Version"] = "1" - - # Add resource attributes to EMF log but not as dimensions - # OTel collector EMF Exporter has a resource_to_telemetry_conversion flag that will convert resource attributes - # as regular metric attributes(potential dimensions). However, for this SDK EMF implementation, - # we align with the OpenTelemetry concept that all metric attributes are treated as dimensions. - # And have resource attributes as just additional metadata in EMF, added otel.resource as prefix to distinguish. - if resource and resource.attributes: - for key, value in resource.attributes.items(): - emf_log[f"otel.resource.{key}"] = str(value) - - # Initialize collections for dimensions and metrics - metric_definitions = [] - # Collect attributes from all records (they should be the same for all records in the group) - # Only collect once from the first record and apply to all records - all_attributes = ( - metric_records[0].attributes - if metric_records and len(metric_records) > 0 and metric_records[0].attributes - else {} - ) - - # Process each metric record - for record in metric_records: - - metric_name = self._get_metric_name(record) - - # Skip processing if metric name is None or empty - if not metric_name: - continue - - # Create metric data dict - metric_data = {"Name": metric_name} - - unit = self._get_unit(record) - if unit: - metric_data["Unit"] = unit - - # Process different types of aggregations - if record.exp_histogram_data: - # Base2 Exponential Histogram - emf_log[metric_name] = record.exp_histogram_data - elif record.histogram_data: - # Regular Histogram metrics - emf_log[metric_name] = record.histogram_data - elif record.value is not None: - # Gauge, Sum, and other aggregations - emf_log[metric_name] = record.value - else: - logger.debug("Skipping metric %s as it does not have valid metric value", metric_name) - continue - - # Add to metric definitions list - metric_definitions.append(metric_data) - - # Get dimension names from collected attributes - dimension_names = self._get_dimension_names(all_attributes) - - # Add attribute values to the root of the EMF log - for name, value in all_attributes.items(): - emf_log[name] = str(value) - - # Add CloudWatch Metrics if we have metrics, include dimensions only if they exist - if metric_definitions: - cloudwatch_metric = {"Namespace": self.namespace, "Metrics": metric_definitions} - if dimension_names: - cloudwatch_metric["Dimensions"] = [dimension_names] - - emf_log["_aws"]["CloudWatchMetrics"].append(cloudwatch_metric) - - return emf_log - - @abstractmethod - def _export(self, log_event: Dict[str, Any]): - """ - Send a log event to the destination (CloudWatch Logs, console, etc.). - - This method must be implemented by subclasses to define where the EMF logs are sent. - - Args: - log_event: The log event to send - """ - - # pylint: disable=too-many-nested-blocks,unused-argument,too-many-branches - def export( - self, metrics_data: MetricsData, timeout_millis: Optional[int] = None, **_kwargs: Any - ) -> MetricExportResult: - """ - Export metrics as EMF logs. - - Groups metrics by attributes and timestamp before creating EMF logs. - - Args: - metrics_data: MetricsData containing resource metrics and scope metrics - timeout_millis: Optional timeout in milliseconds - **kwargs: Additional keyword arguments - - Returns: - MetricExportResult indicating success or failure - """ - try: - if not metrics_data.resource_metrics: - return MetricExportResult.SUCCESS - - # Process all metrics from all resource metrics and scope metrics - for resource_metrics in metrics_data.resource_metrics: - for scope_metrics in resource_metrics.scope_metrics: - # Dictionary to group metrics by attributes and timestamp - grouped_metrics = defaultdict(list) - - # Process all metrics in this scope - for metric in scope_metrics.metrics: - # Skip if metric.data is None or no data_points exists - try: - if not (metric.data and metric.data.data_points): - continue - except AttributeError: - # Metric doesn't have data or data_points attribute - continue - - # Process metrics based on type - metric_type = type(metric.data) - if metric_type in (Gauge, Sum): - for dp in metric.data.data_points: - record = self._convert_gauge_and_sum(metric, dp) - grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) - elif metric_type == Histogram: - for dp in metric.data.data_points: - record = self._convert_histogram(metric, dp) - grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) - elif metric_type == ExponentialHistogram: - for dp in metric.data.data_points: - record = self._convert_exp_histogram(metric, dp) - grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) - else: - logger.debug("Unsupported Metric Type: %s", metric_type) - - # Now process each group separately to create one EMF log per group - for (_, timestamp_ms), metric_records in grouped_metrics.items(): - if not metric_records: - continue - - # Create and send EMF log for this batch of metrics - logger.debug( - "Creating EMF log for %d records at timestamp %s", - len(metric_records), - timestamp_ms, - ) - self._export( - { - "message": json.dumps( - self._create_emf_log(metric_records, resource_metrics.resource, timestamp_ms) - ), - "timestamp": timestamp_ms, - } - ) - - return MetricExportResult.SUCCESS - # pylint: disable=broad-exception-caught - # capture all types of exceptions to not interrupt the instrumented services - except Exception as error: - logger.error("Failed to export metrics: %s", error) - return MetricExportResult.FAILURE diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/console_emf_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/console_emf_exporter.py deleted file mode 100644 index 8563c16b2..000000000 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/console_emf_exporter.py +++ /dev/null @@ -1,104 +0,0 @@ -# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -# SPDX-License-Identifier: Apache-2.0 - -# pylint: disable=no-self-use - -import logging -from typing import Any, Dict, Optional - -from opentelemetry.sdk.metrics.export import AggregationTemporality - -from .base_emf_exporter import BaseEmfExporter - -logger = logging.getLogger(__name__) - - -class ConsoleEmfExporter(BaseEmfExporter): - """ - OpenTelemetry metrics exporter for CloudWatch EMF format to console output. - - This exporter converts OTel metrics into CloudWatch EMF logs and writes them - to standard output instead of sending to CloudWatch Logs. This is useful for - debugging, testing, or when you want to process EMF logs with other tools. - - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html - - """ - - def __init__( - self, - namespace: str = "default", - preferred_temporality: Optional[Dict[type, AggregationTemporality]] = None, - preferred_aggregation: Optional[Dict[type, Any]] = None, - **kwargs: Any, - ) -> None: - """ - Initialize the Console EMF exporter. - - Args: - namespace: CloudWatch namespace for metrics (defaults to "default") - preferred_temporality: Optional dictionary mapping instrument types to aggregation temporality - preferred_aggregation: Optional dictionary mapping instrument types to preferred aggregation - **kwargs: Additional arguments (unused, kept for compatibility) - """ - # No need to check for None since namespace has a default value - if namespace is None: - namespace = "default" - super().__init__(namespace, preferred_temporality, preferred_aggregation) - - def _export(self, log_event: Dict[str, Any]) -> None: - """ - Send a log event message to stdout for console output. - - This method writes the EMF log message to stdout, making it easy to - capture and redirect the output for processing or debugging purposes. - - Args: - log_event: The log event dictionary containing 'message' and 'timestamp' - keys, where 'message' is the JSON-serialized EMF log - - Raises: - No exceptions are raised - errors are logged and handled gracefully - """ - try: - # Write the EMF log message to stdout for easy redirection/capture - message = log_event.get("message", "") - if message: - print(message, flush=True) - else: - logger.warning("Empty message in log event: %s", log_event) - except Exception as error: # pylint: disable=broad-exception-caught - logger.error("Failed to write EMF log to console. Log event: %s. Error: %s", log_event, error) - - def force_flush(self, timeout_millis: int = 10000) -> bool: - """ - Force flush any pending metrics. - - For console output, there's no buffering since we use print() with - flush=True, so this is effectively a no-op that always succeeds. - - Args: - timeout_millis: Timeout in milliseconds (unused for console output) - - Returns: - Always returns True as console output is immediately flushed - """ - logger.debug("ConsoleEmfExporter force_flush called - no buffering to flush for console output") - return True - - def shutdown(self, timeout_millis: Optional[int] = None, **kwargs: Any) -> bool: - """ - Shutdown the exporter. - - For console output, there are no resources to clean up or connections - to close, so this is effectively a no-op that always succeeds. - - Args: - timeout_millis: Timeout in milliseconds (unused for console output) - **kwargs: Additional keyword arguments (unused for console output) - - Returns: - Always returns True as there's no cleanup required for console output - """ - logger.debug("ConsoleEmfExporter shutdown called with timeout_millis=%s", timeout_millis) - return True diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/console/logs/compact_console_log_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/console/logs/compact_console_log_exporter.py deleted file mode 100644 index 18b7e26de..000000000 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/console/logs/compact_console_log_exporter.py +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -# SPDX-License-Identifier: Apache-2.0 -import re -from typing import Sequence - -from opentelemetry.sdk._logs import LogData -from opentelemetry.sdk._logs.export import ConsoleLogExporter, LogExportResult - - -class CompactConsoleLogExporter(ConsoleLogExporter): - def export(self, batch: Sequence[LogData]): - for data in batch: - formatted_json = self.formatter(data.log_record) - print(re.sub(r"\s*([{}[\]:,])\s*", r"\1", formatted_json), flush=True) - - return LogExportResult.SUCCESS diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/version.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/version.py index 0c880afc5..898210494 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/version.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/version.py @@ -1,4 +1,4 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -__version__ = "0.10.1.dev0" +__version__ = "0.10.1" diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py index c8595cdb7..d6a5903af 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py @@ -635,8 +635,8 @@ def test_shutdown(self, mock_force_flush): mock_force_flush.assert_called_once_with(5000) # pylint: disable=broad-exception-caught - def test_export_method_exists(self): - """Test that _export method exists and can be called.""" + def test_send_log_event_method_exists(self): + """Test that _send_log_event method exists and can be called.""" # Just test that the method exists and doesn't crash with basic input log_event = {"message": "test message", "timestamp": 1234567890} @@ -644,10 +644,10 @@ def test_export_method_exists(self): with patch.object(self.exporter.log_client, "send_log_event") as mock_send: # Should not raise an exception try: - self.exporter._export(log_event) + self.exporter._send_log_event(log_event) mock_send.assert_called_once_with(log_event) except Exception as error: - self.fail(f"_export raised an exception: {error}") + self.fail(f"_send_log_event raised an exception: {error}") if __name__ == "__main__": diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_base_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_base_emf_exporter.py deleted file mode 100644 index fa0ace40a..000000000 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_base_emf_exporter.py +++ /dev/null @@ -1,291 +0,0 @@ -# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -# SPDX-License-Identifier: Apache-2.0 - -import unittest -from unittest.mock import Mock - -from amazon.opentelemetry.distro.exporter.aws.metrics.base_emf_exporter import BaseEmfExporter, MetricRecord -from opentelemetry.sdk.metrics.export import MetricExportResult -from opentelemetry.sdk.resources import Resource - - -class ConcreteEmfExporter(BaseEmfExporter): - """Concrete implementation of BaseEmfExporter for testing.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.exported_logs = [] - - def _export(self, log_event): - """Implementation that stores exported logs for testing.""" - self.exported_logs.append(log_event) - - def force_flush(self, timeout_millis=None): # pylint: disable=no-self-use - """Force flush implementation for testing.""" - return True - - def shutdown(self, timeout_millis=None): # pylint: disable=no-self-use - """Shutdown implementation for testing.""" - return True - - -class TestMetricRecord(unittest.TestCase): - """Test MetricRecord class.""" - - def test_metric_record_initialization(self): - """Test MetricRecord initialization with basic values.""" - record = MetricRecord("test_metric", "Count", "Test description") - - self.assertEqual(record.name, "test_metric") - self.assertEqual(record.unit, "Count") - self.assertEqual(record.description, "Test description") - self.assertIsNone(record.timestamp) - self.assertEqual(record.attributes, {}) - self.assertIsNone(record.value) - self.assertIsNone(record.sum_data) - self.assertIsNone(record.histogram_data) - self.assertIsNone(record.exp_histogram_data) - - -class TestBaseEmfExporter(unittest.TestCase): - """Test BaseEmfExporter class.""" - - def setUp(self): - """Set up test fixtures.""" - self.exporter = ConcreteEmfExporter(namespace="TestNamespace") - - def test_initialization(self): - """Test exporter initialization.""" - exporter = ConcreteEmfExporter() - self.assertEqual(exporter.namespace, "default") - - exporter_custom = ConcreteEmfExporter(namespace="CustomNamespace") - self.assertEqual(exporter_custom.namespace, "CustomNamespace") - - def test_get_metric_name(self): - """Test metric name extraction.""" - # Test with valid name - record = Mock() - record.name = "test_metric" - result = self.exporter._get_metric_name(record) - self.assertEqual(result, "test_metric") - - # Test with empty name - record.name = "" - result = self.exporter._get_metric_name(record) - self.assertIsNone(result) - - # Test with None name - record.name = None - result = self.exporter._get_metric_name(record) - self.assertIsNone(result) - - def test_get_unit(self): - """Test unit mapping functionality.""" - # Test EMF supported units (should return as-is) - record = MetricRecord("test", "Count", "desc") - self.assertEqual(self.exporter._get_unit(record), "Count") - - record = MetricRecord("test", "Percent", "desc") - self.assertEqual(self.exporter._get_unit(record), "Percent") - - # Test OTel unit mapping - record = MetricRecord("test", "ms", "desc") - self.assertEqual(self.exporter._get_unit(record), "Milliseconds") - - record = MetricRecord("test", "s", "desc") - self.assertEqual(self.exporter._get_unit(record), "Seconds") - - record = MetricRecord("test", "By", "desc") - self.assertEqual(self.exporter._get_unit(record), "Bytes") - - # Test units that map to empty string - record = MetricRecord("test", "1", "desc") - self.assertEqual(self.exporter._get_unit(record), "") - - # Test unknown unit - record = MetricRecord("test", "unknown", "desc") - self.assertIsNone(self.exporter._get_unit(record)) - - # Test None unit - record = MetricRecord("test", None, "desc") - self.assertIsNone(self.exporter._get_unit(record)) - - def test_get_dimension_names(self): - """Test dimension names extraction.""" - attributes = {"service": "test", "env": "prod"} - result = self.exporter._get_dimension_names(attributes) - self.assertEqual(set(result), {"service", "env"}) - - # Test empty attributes - result = self.exporter._get_dimension_names({}) - self.assertEqual(result, []) - - def test_get_attributes_key(self): - """Test attributes key generation.""" - attrs1 = {"b": "2", "a": "1"} - attrs2 = {"a": "1", "b": "2"} - - key1 = self.exporter._get_attributes_key(attrs1) - key2 = self.exporter._get_attributes_key(attrs2) - - # Keys should be consistent regardless of order - self.assertEqual(key1, key2) - self.assertIsInstance(key1, str) - - def test_normalize_timestamp(self): - """Test timestamp normalization.""" - timestamp_ns = 1609459200000000000 # nanoseconds - expected_ms = 1609459200000 # milliseconds - - result = self.exporter._normalize_timestamp(timestamp_ns) - self.assertEqual(result, expected_ms) - - def test_create_metric_record(self): - """Test metric record creation.""" - record = self.exporter._create_metric_record("test_metric", "Count", "Description") - - self.assertIsInstance(record, MetricRecord) - self.assertEqual(record.name, "test_metric") - self.assertEqual(record.unit, "Count") - self.assertEqual(record.description, "Description") - - def test_convert_gauge_and_sum(self): - """Test gauge and sum conversion.""" - metric = Mock() - metric.name = "test_gauge" - metric.unit = "Count" - metric.description = "Test gauge" - - data_point = Mock() - data_point.value = 42.0 - data_point.attributes = {"key": "value"} - data_point.time_unix_nano = 1609459200000000000 - - record = self.exporter._convert_gauge_and_sum(metric, data_point) - - self.assertEqual(record.name, "test_gauge") - self.assertEqual(record.value, 42.0) - self.assertEqual(record.attributes, {"key": "value"}) - self.assertEqual(record.timestamp, 1609459200000) - - def test_convert_histogram(self): - """Test histogram conversion.""" - metric = Mock() - metric.name = "test_histogram" - metric.unit = "ms" - metric.description = "Test histogram" - - data_point = Mock() - data_point.count = 5 - data_point.sum = 25.0 - data_point.min = 1.0 - data_point.max = 10.0 - data_point.attributes = {"service": "test"} - data_point.time_unix_nano = 1609459200000000000 - - record = self.exporter._convert_histogram(metric, data_point) - - self.assertEqual(record.name, "test_histogram") - expected_data = {"Count": 5, "Sum": 25.0, "Min": 1.0, "Max": 10.0} - self.assertEqual(record.histogram_data, expected_data) - self.assertEqual(record.attributes, {"service": "test"}) - - def test_convert_exp_histogram(self): - """Test exponential histogram conversion.""" - metric = Mock() - metric.name = "test_exp_histogram" - metric.unit = "s" - metric.description = "Test exponential histogram" - - data_point = Mock() - data_point.count = 10 - data_point.sum = 50.0 - data_point.min = 1.0 - data_point.max = 20.0 - data_point.scale = 1 - data_point.zero_count = 0 - data_point.attributes = {"env": "test"} - data_point.time_unix_nano = 1609459200000000000 - - # Mock buckets - data_point.positive = Mock() - data_point.positive.offset = 0 - data_point.positive.bucket_counts = [1, 2, 1] - - data_point.negative = Mock() - data_point.negative.offset = 0 - data_point.negative.bucket_counts = [] - - record = self.exporter._convert_exp_histogram(metric, data_point) - - self.assertEqual(record.name, "test_exp_histogram") - self.assertIsNotNone(record.exp_histogram_data) - self.assertIn("Values", record.exp_histogram_data) - self.assertIn("Counts", record.exp_histogram_data) - self.assertEqual(record.exp_histogram_data["Count"], 10) - self.assertEqual(record.exp_histogram_data["Sum"], 50.0) - - def test_group_by_attributes_and_timestamp(self): - """Test grouping by attributes and timestamp.""" - record = Mock() - record.attributes = {"env": "test"} - record.timestamp = 1234567890 - - result = self.exporter._group_by_attributes_and_timestamp(record) - - self.assertIsInstance(result, tuple) - self.assertEqual(len(result), 2) - self.assertEqual(result[1], 1234567890) - - def test_create_emf_log(self): - """Test EMF log creation.""" - # Create a simple metric record - record = self.exporter._create_metric_record("test_metric", "Count", "Test") - record.value = 50.0 - record.timestamp = 1234567890 - record.attributes = {"env": "test"} - - records = [record] - resource = Resource.create({"service.name": "test-service"}) - - result = self.exporter._create_emf_log(records, resource, 1234567890) - - # Check basic EMF structure - self.assertIn("_aws", result) - self.assertIn("CloudWatchMetrics", result["_aws"]) - self.assertEqual(result["_aws"]["Timestamp"], 1234567890) - self.assertEqual(result["Version"], "1") - - # Check metric value - self.assertEqual(result["test_metric"], 50.0) - - # Check resource attributes - self.assertEqual(result["otel.resource.service.name"], "test-service") - - # Check CloudWatch metrics - cw_metrics = result["_aws"]["CloudWatchMetrics"][0] - self.assertEqual(cw_metrics["Namespace"], "TestNamespace") - self.assertEqual(cw_metrics["Metrics"][0]["Name"], "test_metric") - - def test_export_empty_metrics(self): - """Test export with empty metrics data.""" - metrics_data = Mock() - metrics_data.resource_metrics = [] - - result = self.exporter.export(metrics_data) - self.assertEqual(result, MetricExportResult.SUCCESS) - - def test_export_failure_handling(self): - """Test export failure handling.""" - metrics_data = Mock() - # Make iteration fail - metrics_data.resource_metrics = Mock() - metrics_data.resource_metrics.__iter__ = Mock(side_effect=Exception("Test exception")) - - result = self.exporter.export(metrics_data) - self.assertEqual(result, MetricExportResult.FAILURE) - - -if __name__ == "__main__": - unittest.main() diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_console_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_console_emf_exporter.py deleted file mode 100644 index 8783dfd92..000000000 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_console_emf_exporter.py +++ /dev/null @@ -1,288 +0,0 @@ -# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -# SPDX-License-Identifier: Apache-2.0 - -import unittest -from io import StringIO -from unittest.mock import MagicMock, patch - -from amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter import ConsoleEmfExporter -from opentelemetry.sdk.metrics import Counter -from opentelemetry.sdk.metrics.export import AggregationTemporality, MetricExportResult, MetricsData - - -class TestConsoleEmfExporter(unittest.TestCase): - """Test ConsoleEmfExporter class.""" - - def setUp(self): - """Set up test fixtures.""" - self.exporter = ConsoleEmfExporter() - - def test_namespace_initialization(self): - """Test exporter initialization with different namespace scenarios.""" - # Test default namespace - exporter = ConsoleEmfExporter() - self.assertEqual(exporter.namespace, "default") - - # Test custom namespace - exporter = ConsoleEmfExporter(namespace="CustomNamespace") - self.assertEqual(exporter.namespace, "CustomNamespace") - - # Test None namespace (should default to 'default') - exporter = ConsoleEmfExporter(namespace=None) - self.assertEqual(exporter.namespace, "default") - - # Test empty string namespace (should remain empty) - exporter = ConsoleEmfExporter(namespace="") - self.assertEqual(exporter.namespace, "") - - def test_initialization_with_parameters(self): - """Test exporter initialization with optional parameters.""" - # Test with preferred_temporality - preferred_temporality = {Counter: AggregationTemporality.CUMULATIVE} - exporter = ConsoleEmfExporter(namespace="TestNamespace", preferred_temporality=preferred_temporality) - self.assertEqual(exporter.namespace, "TestNamespace") - self.assertEqual(exporter._preferred_temporality[Counter], AggregationTemporality.CUMULATIVE) - - # Test with preferred_aggregation - preferred_aggregation = {Counter: "TestAggregation"} - exporter = ConsoleEmfExporter(preferred_aggregation=preferred_aggregation) - self.assertEqual(exporter._preferred_aggregation[Counter], "TestAggregation") - - # Test with additional kwargs - exporter = ConsoleEmfExporter(namespace="TestNamespace", extra_param="ignored") # Should be ignored - self.assertEqual(exporter.namespace, "TestNamespace") - - def test_export_log_event_success(self): - """Test that log events are properly sent to console output.""" - # Create a simple log event with EMF-formatted message - test_message = ( - '{"_aws":{"Timestamp":1640995200000,"CloudWatchMetrics":[{"Namespace":"TestNamespace",' - '"Dimensions":[["Service"]],"Metrics":[{"Name":"TestMetric","Unit":"Count"}]}]},' - '"Service":"test-service","TestMetric":42}' - ) - log_event = {"message": test_message, "timestamp": 1640995200000} - - # Capture stdout to verify the output - with patch("sys.stdout", new_callable=StringIO) as mock_stdout: - self.exporter._export(log_event) - - # Verify the message was printed to stdout with flush - captured_output = mock_stdout.getvalue().strip() - self.assertEqual(captured_output, test_message) - - def test_export_log_event_empty_message(self): - """Test handling of log events with empty messages.""" - log_event = {"message": "", "timestamp": 1640995200000} - - with patch("sys.stdout", new_callable=StringIO) as mock_stdout: - with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: - self.exporter._export(log_event) - - # Should not print anything for empty message - captured_output = mock_stdout.getvalue().strip() - self.assertEqual(captured_output, "") - - # Should log a warning - mock_logger.warning.assert_called_once() - - def test_export_log_event_missing_message(self): - """Test handling of log events without message key.""" - log_event = {"timestamp": 1640995200000} - - with patch("sys.stdout", new_callable=StringIO) as mock_stdout: - with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: - self.exporter._export(log_event) - - # Should not print anything when message is missing - captured_output = mock_stdout.getvalue().strip() - self.assertEqual(captured_output, "") - - # Should log a warning - mock_logger.warning.assert_called_once() - - def test_export_log_event_with_none_message(self): - """Test handling of log events with None message.""" - log_event = {"message": None, "timestamp": 1640995200000} - - with patch("sys.stdout", new_callable=StringIO) as mock_stdout: - with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: - self.exporter._export(log_event) - - # Should not print anything when message is None - captured_output = mock_stdout.getvalue().strip() - self.assertEqual(captured_output, "") - - # Should log a warning - mock_logger.warning.assert_called_once() - - def test_export_log_event_print_exception(self): - """Test error handling when print() raises an exception.""" - log_event = {"message": "test message", "timestamp": 1640995200000} - - with patch("builtins.print", side_effect=Exception("Print failed")): - with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: - self.exporter._export(log_event) - - # Should log the error - mock_logger.error.assert_called_once() - args = mock_logger.error.call_args[0] - self.assertIn("Failed to write EMF log to console", args[0]) - self.assertEqual(args[1], log_event) - self.assertIn("Print failed", str(args[2])) - - def test_export_log_event_various_message_types(self): - """Test _export with various message types.""" - # Test with JSON string - json_message = '{"key": "value"}' - log_event = {"message": json_message, "timestamp": 1640995200000} - - with patch("sys.stdout", new_callable=StringIO) as mock_stdout: - self.exporter._export(log_event) - - captured_output = mock_stdout.getvalue().strip() - self.assertEqual(captured_output, json_message) - - # Test with plain string - plain_message = "Simple log message" - log_event = {"message": plain_message, "timestamp": 1640995200000} - - with patch("sys.stdout", new_callable=StringIO) as mock_stdout: - self.exporter._export(log_event) - - captured_output = mock_stdout.getvalue().strip() - self.assertEqual(captured_output, plain_message) - - def test_force_flush(self): - """Test force_flush method.""" - with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: - # Test with default timeout - result = self.exporter.force_flush() - self.assertTrue(result) - mock_logger.debug.assert_called_once() - - # Reset mock for next call - mock_logger.reset_mock() - - # Test with custom timeout - result = self.exporter.force_flush(timeout_millis=5000) - self.assertTrue(result) - mock_logger.debug.assert_called_once() - - def test_shutdown(self): - """Test shutdown method.""" - with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: - # Test with no timeout - result = self.exporter.shutdown() - self.assertTrue(result) - mock_logger.debug.assert_called_once_with("ConsoleEmfExporter shutdown called with timeout_millis=%s", None) - - # Reset mock for next call - mock_logger.reset_mock() - - # Test with timeout - result = self.exporter.shutdown(timeout_millis=3000) - self.assertTrue(result) - mock_logger.debug.assert_called_once_with("ConsoleEmfExporter shutdown called with timeout_millis=%s", 3000) - - # Reset mock for next call - mock_logger.reset_mock() - - # Test with additional kwargs - result = self.exporter.shutdown(timeout_millis=3000, extra_arg="ignored") - self.assertTrue(result) - mock_logger.debug.assert_called_once() - - def test_integration_with_metrics_data(self): - """Test the full export flow with actual MetricsData.""" - # Create a mock MetricsData - mock_metrics_data = MagicMock(spec=MetricsData) - mock_resource_metrics = MagicMock() - mock_scope_metrics = MagicMock() - mock_metric = MagicMock() - - # Set up the mock hierarchy - mock_metrics_data.resource_metrics = [mock_resource_metrics] - mock_resource_metrics.scope_metrics = [mock_scope_metrics] - mock_scope_metrics.metrics = [mock_metric] - - # Mock the metric to have no data_points to avoid complex setup - mock_metric.data = None - - with patch("sys.stdout", new_callable=StringIO): - result = self.exporter.export(mock_metrics_data) - - # Should succeed even with no actual metrics - self.assertEqual(result, MetricExportResult.SUCCESS) - - def test_integration_export_success(self): - """Test export method returns success.""" - # Create empty MetricsData - mock_metrics_data = MagicMock(spec=MetricsData) - mock_metrics_data.resource_metrics = [] - - result = self.exporter.export(mock_metrics_data) - self.assertEqual(result, MetricExportResult.SUCCESS) - - def test_integration_export_with_timeout(self): - """Test export method with timeout parameter.""" - mock_metrics_data = MagicMock(spec=MetricsData) - mock_metrics_data.resource_metrics = [] - - result = self.exporter.export(mock_metrics_data, timeout_millis=5000) - self.assertEqual(result, MetricExportResult.SUCCESS) - - def test_export_failure_handling(self): - """Test export method handles exceptions and returns failure.""" - # Create a mock that raises an exception - mock_metrics_data = MagicMock(spec=MetricsData) - mock_metrics_data.resource_metrics = [MagicMock()] - - # Make the resource_metrics access raise an exception - type(mock_metrics_data).resource_metrics = property( - lambda self: (_ for _ in ()).throw(Exception("Test exception")) - ) - - # Patch the logger in the base_emf_exporter since that's where the error logging happens - with patch("amazon.opentelemetry.distro.exporter.aws.metrics.base_emf_exporter.logger") as mock_logger: - result = self.exporter.export(mock_metrics_data) - - self.assertEqual(result, MetricExportResult.FAILURE) - mock_logger.error.assert_called_once() - self.assertIn("Failed to export metrics", mock_logger.error.call_args[0][0]) - - def test_flush_output_verification(self): - """Test that print is called with flush=True.""" - log_event = {"message": "test message", "timestamp": 1640995200000} - - with patch("builtins.print") as mock_print: - self.exporter._export(log_event) - - # Verify print was called with flush=True - mock_print.assert_called_once_with("test message", flush=True) - - def test_logger_levels(self): - """Test that appropriate log levels are used.""" - # Test debug logging in force_flush - with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: - self.exporter.force_flush() - mock_logger.debug.assert_called_once() - - # Test debug logging in shutdown - with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: - self.exporter.shutdown() - mock_logger.debug.assert_called_once() - - # Test warning logging for empty message - with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: - self.exporter._export({"message": "", "timestamp": 123}) - mock_logger.warning.assert_called_once() - - # Test error logging for exception - with patch("builtins.print", side_effect=Exception("Test")): - with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: - self.exporter._export({"message": "test", "timestamp": 123}) - mock_logger.error.assert_called_once() - - -if __name__ == "__main__": - unittest.main() diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/console/logs/test_compact_console_log_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/console/logs/test_compact_console_log_exporter.py deleted file mode 100644 index 5000102e8..000000000 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/console/logs/test_compact_console_log_exporter.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -# SPDX-License-Identifier: Apache-2.0 -import unittest -from unittest.mock import Mock, patch - -from amazon.opentelemetry.distro.exporter.console.logs.compact_console_log_exporter import CompactConsoleLogExporter -from opentelemetry.sdk._logs.export import LogExportResult - - -class TestCompactConsoleLogExporter(unittest.TestCase): - - def setUp(self): - self.exporter = CompactConsoleLogExporter() - - @patch("builtins.print") - def test_export_compresses_json(self, mock_print): - # Mock log data - mock_log_data = Mock() - mock_log_record = Mock() - mock_log_data.log_record = mock_log_record - - # Mock formatted JSON with whitespace - formatted_json = '{\n "body": "test message",\n "severity_number": 9,\n "attributes": {\n "key": "value"\n }\n}' # noqa: E501 - self.exporter.formatter = Mock(return_value=formatted_json) - - # Call export - result = self.exporter.export([mock_log_data]) - - # Verify result - self.assertEqual(result, LogExportResult.SUCCESS) - - # Verify print calls - self.assertEqual(mock_print.call_count, 1) - mock_print.assert_called_with( - '{"body":"test message","severity_number":9,"attributes":{"key":"value"}}', flush=True - ) - - @patch("builtins.print") - def test_export_multiple_records(self, mock_print): - # Mock multiple log data - mock_log_data1 = Mock() - mock_log_data2 = Mock() - mock_log_data1.log_record = Mock() - mock_log_data2.log_record = Mock() - - formatted_json = '{\n "body": "test"\n}' - self.exporter.formatter = Mock(return_value=formatted_json) - - # Call export - result = self.exporter.export([mock_log_data1, mock_log_data2]) - - # Verify result - self.assertEqual(result, LogExportResult.SUCCESS) - - # Verify print calls - self.assertEqual(mock_print.call_count, 2) # 2 records - # Each record should print compact JSON - expected_calls = [unittest.mock.call('{"body":"test"}', flush=True)] * 2 - mock_print.assert_has_calls(expected_calls) - - @patch("builtins.print") - def test_export_empty_batch(self, mock_print): - # Call export with empty batch - result = self.exporter.export([]) - - # Verify result - self.assertEqual(result, LogExportResult.SUCCESS) - - # Verify print calls - mock_print.assert_not_called() # No records, no prints diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index efcc5a317..9e4afc81e 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -25,7 +25,6 @@ AwsOpenTelemetryConfigurator, OtlpLogHeaderSetting, _check_emf_exporter_enabled, - _clear_logs_header_cache, _create_aws_otlp_exporter, _create_emf_exporter, _custom_import_sampler, @@ -38,16 +37,16 @@ _customize_span_processors, _export_unsampled_span_for_agent_observability, _export_unsampled_span_for_lambda, - _fetch_logs_header, _init_logging, _is_application_signals_enabled, _is_application_signals_runtime_enabled, _is_defer_to_workers_enabled, _is_wsgi_master_process, + _validate_and_fetch_logs_header, ) from amazon.opentelemetry.distro.aws_opentelemetry_distro import AwsOpenTelemetryDistro from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor -from amazon.opentelemetry.distro.exporter.console.logs.compact_console_log_exporter import CompactConsoleLogExporter +from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import AwsCloudWatchEmfExporter from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession # pylint: disable=line-too-long @@ -71,7 +70,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.metrics import get_meter_provider from opentelemetry.processor.baggage import BaggageSpanProcessor -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -509,7 +508,6 @@ def test_customize_span_exporter_sigv4(self): bad_configs.append(config) for config in good_configs: - _clear_logs_header_cache() self.customize_exporter_test( config, _customize_span_exporter, @@ -521,7 +519,6 @@ def test_customize_span_exporter_sigv4(self): ) for config in bad_configs: - _clear_logs_header_cache() self.customize_exporter_test( config, _customize_span_exporter, @@ -619,7 +616,6 @@ def test_customize_logs_exporter_sigv4(self): bad_configs.append(config) for config in good_configs: - _clear_logs_header_cache() self.customize_exporter_test( config, _customize_logs_exporter, @@ -630,7 +626,6 @@ def test_customize_logs_exporter_sigv4(self): ) for config in bad_configs: - _clear_logs_header_cache() self.customize_exporter_test( config, _customize_logs_exporter, OTLPLogExporter(), OTLPLogExporter, Session, Compression.NoCompression ) @@ -681,58 +676,6 @@ def capture_exporter(*args, **kwargs): os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT) - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.LoggingHandler", return_value=MagicMock()) - @patch("logging.getLogger", return_value=MagicMock()) - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._customize_logs_exporter") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.LoggerProvider", return_value=MagicMock()) - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._customize_log_record_processor") - def test_init_logging_console_exporter_replacement( - self, - mock_customize_processor, - mock_logger_provider, - mock_customize_logs_exporter, - mock_get_logger, - mock_logging_handler, - ): - """Test that ConsoleLogExporter is replaced with CompactConsoleLogExporter when in Lambda""" - - # Mock _is_lambda_environment to return True - with patch( - "amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment", return_value=True - ): - # Test with ConsoleLogExporter - exporters = {"console": ConsoleLogExporter} - _init_logging(exporters, Resource.get_empty()) - - # Verify that _customize_log_record_processor was called - mock_customize_processor.assert_called_once() - - # Get the exporter that was passed to _customize_logs_exporter - call_args = mock_customize_logs_exporter.call_args - exporter_instance = call_args[0][0] - - # Verify it's a CompactConsoleLogExporter instance - self.assertIsInstance(exporter_instance, CompactConsoleLogExporter) - - # Reset mocks - mock_customize_processor.reset_mock() - mock_customize_logs_exporter.reset_mock() - - # Test when not in Lambda environment - should not replace - with patch( - "amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment", return_value=False - ): - exporters = {"console": ConsoleLogExporter} - _init_logging(exporters, Resource.get_empty()) - - # Get the exporter that was passed to _customize_logs_exporter - call_args = mock_customize_logs_exporter.call_args - exporter_instance = call_args[0][0] - - # Verify it's still a regular ConsoleLogExporter - self.assertIsInstance(exporter_instance, ConsoleLogExporter) - self.assertNotIsInstance(exporter_instance, CompactConsoleLogExporter) - def test_customize_span_processors(self): mock_tracer_provider: TracerProvider = MagicMock() os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) @@ -1021,72 +964,64 @@ def test_check_emf_exporter_enabled(self): # Clean up os.environ.pop("OTEL_METRICS_EXPORTER", None) - def test_fetch_logs_header(self): - _clear_logs_header_cache() - + def test_validate_and_fetch_logs_header(self): # Test when headers are not set os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_HEADERS, None) - result = _fetch_logs_header() + result = _validate_and_fetch_logs_header() self.assertIsInstance(result, OtlpLogHeaderSetting) self.assertIsNone(result.log_group) self.assertIsNone(result.log_stream) self.assertIsNone(result.namespace) - self.assertFalse(result.is_valid()) + self.assertFalse(result.is_valid) - # Test singleton behavior - should return the same cached instance - result2 = _fetch_logs_header() - self.assertIs(result, result2) # Same object reference - - _clear_logs_header_cache() + # Test with valid headers os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group,x-aws-log-stream=test-stream" - result = _fetch_logs_header() + result = _validate_and_fetch_logs_header() self.assertEqual(result.log_group, "test-group") self.assertEqual(result.log_stream, "test-stream") self.assertIsNone(result.namespace) - self.assertTrue(result.is_valid()) - - # Test singleton behavior again - result2 = _fetch_logs_header() - self.assertIs(result, result2) + self.assertTrue(result.is_valid) - _clear_logs_header_cache() + # Test with valid headers including namespace os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = ( "x-aws-log-group=test-group,x-aws-log-stream=test-stream,x-aws-metric-namespace=test-namespace" ) - result = _fetch_logs_header() + result = _validate_and_fetch_logs_header() + self.assertEqual(result.log_group, "test-group") + self.assertEqual(result.log_stream, "test-stream") self.assertEqual(result.namespace, "test-namespace") - self.assertTrue(result.is_valid()) + self.assertTrue(result.is_valid) - _clear_logs_header_cache() + # Test with missing log group os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-stream=test-stream" - result = _fetch_logs_header() + result = _validate_and_fetch_logs_header() + self.assertIsNone(result.log_group) self.assertEqual(result.log_stream, "test-stream") - self.assertFalse(result.is_valid()) + self.assertFalse(result.is_valid) - _clear_logs_header_cache() + # Test with missing log stream os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group" - result = _fetch_logs_header() + result = _validate_and_fetch_logs_header() self.assertEqual(result.log_group, "test-group") self.assertIsNone(result.log_stream) - self.assertFalse(result.is_valid()) + self.assertFalse(result.is_valid) - _clear_logs_header_cache() + # Test with empty value in log group os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=,x-aws-log-stream=test-stream" - result = _fetch_logs_header() + result = _validate_and_fetch_logs_header() self.assertIsNone(result.log_group) self.assertEqual(result.log_stream, "test-stream") - self.assertFalse(result.is_valid()) + self.assertFalse(result.is_valid) - _clear_logs_header_cache() + # Test with empty value in log stream os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group,x-aws-log-stream=" - result = _fetch_logs_header() + result = _validate_and_fetch_logs_header() self.assertEqual(result.log_group, "test-group") self.assertIsNone(result.log_stream) - self.assertFalse(result.is_valid()) + self.assertFalse(result.is_valid) # Clean up os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_HEADERS, None) - _clear_logs_header_cache() @patch( "amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled", return_value=False @@ -1116,6 +1051,63 @@ def test_customize_log_record_processor_with_agent_observability(self, _): added_processor = mock_logger_provider.add_log_record_processor.call_args[0][0] self.assertIsInstance(added_processor, AwsCloudWatchOtlpBatchLogRecordProcessor) + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._validate_and_fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter(self, mock_get_session, mock_validate): + # Test when botocore is not installed + mock_get_session.return_value = None + result = _create_emf_exporter() + self.assertIsNone(result) + + # Reset mock for subsequent tests + mock_get_session.reset_mock() + mock_get_session.return_value = MagicMock() + + # Mock the EMF exporter class import by patching the module import + with patch( + "amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter.AwsCloudWatchEmfExporter" + ) as mock_emf_exporter_class: + mock_exporter_instance = MagicMock() + mock_exporter_instance.namespace = "default" + mock_exporter_instance.log_group_name = "test-group" + mock_emf_exporter_class.return_value = mock_exporter_instance + + # Test when headers are invalid + mock_validate.return_value = OtlpLogHeaderSetting(None, None, None, False) + result = _create_emf_exporter() + self.assertIsNone(result) + + # Test when namespace is missing (should still create exporter with default namespace) + mock_validate.return_value = OtlpLogHeaderSetting("test-group", "test-stream", None, True) + result = _create_emf_exporter() + self.assertIsNotNone(result) + self.assertEqual(result, mock_exporter_instance) + # Verify that the EMF exporter was called with correct parameters + mock_emf_exporter_class.assert_called_with( + session=mock_get_session.return_value, + namespace=None, + log_group_name="test-group", + log_stream_name="test-stream", + ) + + # Test with valid configuration + mock_validate.return_value = OtlpLogHeaderSetting("test-group", "test-stream", "test-namespace", True) + result = _create_emf_exporter() + self.assertIsNotNone(result) + self.assertEqual(result, mock_exporter_instance) + # Verify that the EMF exporter was called with correct parameters + mock_emf_exporter_class.assert_called_with( + session=mock_get_session.return_value, + namespace="test-namespace", + log_group_name="test-group", + log_stream_name="test-stream", + ) + + # Test exception handling + mock_validate.side_effect = Exception("Test exception") + result = _create_emf_exporter() + self.assertIsNone(result) + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_logger_provider") @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled") @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") @@ -1182,6 +1174,35 @@ def test_create_aws_otlp_exporter(self, mock_get_session, mock_is_agent_enabled, result = _create_aws_otlp_exporter("https://xray.us-east-1.amazonaws.com/v1/traces", "xray", "us-east-1") self.assertIsNone(result) + def test_customize_metric_exporters_with_emf(self): + metric_readers = [] + views = [] + + # Test with EMF disabled + _customize_metric_exporters(metric_readers, views, is_emf_enabled=False) + self.assertEqual(len(metric_readers), 0) + + # Test with EMF enabled but create_emf_exporter returns None + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator._create_emf_exporter", return_value=None + ): + _customize_metric_exporters(metric_readers, views, is_emf_enabled=True) + self.assertEqual(len(metric_readers), 0) + + # Test with EMF enabled and valid exporter + mock_emf_exporter = MagicMock(spec=AwsCloudWatchEmfExporter) + # Add the required attributes that PeriodicExportingMetricReader expects + mock_emf_exporter._preferred_temporality = {} + mock_emf_exporter._preferred_aggregation = {} + + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator._create_emf_exporter", + return_value=mock_emf_exporter, + ): + _customize_metric_exporters(metric_readers, views, is_emf_enabled=True) + self.assertEqual(len(metric_readers), 1) + self.assertIsInstance(metric_readers[0], PeriodicExportingMetricReader) + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled") @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_service_attribute") def test_customize_resource_without_agent_observability(self, mock_get_service_attribute, mock_is_agent_enabled): @@ -1229,202 +1250,6 @@ def test_customize_resource_with_existing_agent_type(self, mock_get_service_attr self.assertEqual(result.attributes[AWS_LOCAL_SERVICE], "test-service") self.assertEqual(result.attributes[AWS_SERVICE_TYPE], "existing-agent") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") - def test_create_emf_exporter_lambda_without_valid_headers( - self, mock_get_session, mock_is_lambda, mock_fetch_headers - ): - """Test _create_emf_exporter returns ConsoleEmfExporter for Lambda without valid log headers""" - # Setup mocks - mock_is_lambda.return_value = True - mock_header_setting = MagicMock() - mock_header_setting.is_valid.return_value = False - mock_header_setting.namespace = "test-namespace" - mock_fetch_headers.return_value = mock_header_setting - - with patch( - "amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.ConsoleEmfExporter" - ) as mock_console_exporter: - mock_exporter_instance = MagicMock() - mock_console_exporter.return_value = mock_exporter_instance - - result = _create_emf_exporter() - - self.assertEqual(result, mock_exporter_instance) - mock_console_exporter.assert_called_once_with(namespace="test-namespace") - - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") - def test_create_emf_exporter_lambda_with_valid_headers(self, mock_get_session, mock_is_lambda, mock_fetch_headers): - """Test _create_emf_exporter returns AwsCloudWatchEmfExporter for Lambda with valid headers""" - # Setup mocks - mock_is_lambda.return_value = True - mock_session = MagicMock() - mock_get_session.return_value = mock_session - - mock_header_setting = MagicMock() - mock_header_setting.is_valid.return_value = True - mock_header_setting.namespace = "test-namespace" - mock_header_setting.log_group = "test-group" - mock_header_setting.log_stream = "test-stream" - mock_fetch_headers.return_value = mock_header_setting - - with patch( - "amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter.AwsCloudWatchEmfExporter" - ) as mock_cloudwatch_exporter: - mock_exporter_instance = MagicMock() - mock_cloudwatch_exporter.return_value = mock_exporter_instance - - result = _create_emf_exporter() - - self.assertEqual(result, mock_exporter_instance) - mock_cloudwatch_exporter.assert_called_once_with( - session=mock_session, - namespace="test-namespace", - log_group_name="test-group", - log_stream_name="test-stream", - ) - - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") - def test_create_emf_exporter_non_lambda_with_valid_headers( - self, mock_get_session, mock_is_lambda, mock_fetch_headers - ): - """Test _create_emf_exporter returns AwsCloudWatchEmfExporter for non-Lambda with valid headers""" - # Setup mocks - mock_is_lambda.return_value = False - mock_session = MagicMock() - mock_get_session.return_value = mock_session - - mock_header_setting = MagicMock() - mock_header_setting.is_valid.return_value = True - mock_header_setting.namespace = "test-namespace" - mock_header_setting.log_group = "test-group" - mock_header_setting.log_stream = "test-stream" - mock_fetch_headers.return_value = mock_header_setting - - with patch( - "amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter.AwsCloudWatchEmfExporter" - ) as mock_cloudwatch_exporter: - mock_exporter_instance = MagicMock() - mock_cloudwatch_exporter.return_value = mock_exporter_instance - - result = _create_emf_exporter() - - self.assertEqual(result, mock_exporter_instance) - mock_cloudwatch_exporter.assert_called_once_with( - session=mock_session, - namespace="test-namespace", - log_group_name="test-group", - log_stream_name="test-stream", - ) - - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") - def test_create_emf_exporter_non_lambda_without_valid_headers( - self, mock_get_session, mock_is_lambda, mock_fetch_headers - ): - """Test _create_emf_exporter returns None for non-Lambda without valid headers""" - # Setup mocks - mock_is_lambda.return_value = False - mock_session = MagicMock() - mock_get_session.return_value = mock_session - - mock_header_setting = MagicMock() - mock_header_setting.is_valid.return_value = False - mock_fetch_headers.return_value = mock_header_setting - - result = _create_emf_exporter() - - self.assertIsNone(result) - - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") - def test_create_emf_exporter_no_botocore_session( - self, mock_logger, mock_get_session, mock_is_lambda, mock_fetch_headers - ): - """Test _create_emf_exporter returns None when botocore session is not available""" - # Setup mocks - mock_is_lambda.return_value = False - mock_get_session.return_value = None # Simulate missing botocore - - mock_header_setting = MagicMock() - mock_header_setting.is_valid.return_value = True - mock_fetch_headers.return_value = mock_header_setting - - result = _create_emf_exporter() - - self.assertIsNone(result) - mock_logger.warning.assert_called_once_with("botocore is not installed. EMF exporter requires botocore") - - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") - def test_create_emf_exporter_exception_handling(self, mock_logger, mock_fetch_headers): - """Test _create_emf_exporter handles exceptions gracefully""" - # Setup mocks to raise exception - test_exception = Exception("Test exception") - mock_fetch_headers.side_effect = test_exception - - result = _create_emf_exporter() - - self.assertIsNone(result) - mock_logger.error.assert_called_once_with("Failed to create EMF exporter: %s", test_exception) - - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") - def test_create_emf_exporter_lambda_without_valid_headers_none_namespace( - self, mock_get_session, mock_is_lambda, mock_fetch_headers - ): - """Test _create_emf_exporter with Lambda environment and None namespace""" - # Setup mocks - mock_is_lambda.return_value = True - mock_header_setting = MagicMock() - mock_header_setting.is_valid.return_value = False - mock_header_setting.namespace = None - mock_fetch_headers.return_value = mock_header_setting - - with patch( - "amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.ConsoleEmfExporter" - ) as mock_console_exporter: - mock_exporter_instance = MagicMock() - mock_console_exporter.return_value = mock_exporter_instance - - result = _create_emf_exporter() - - self.assertEqual(result, mock_exporter_instance) - mock_console_exporter.assert_called_once_with(namespace=None) - - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") - def test_create_emf_exporter_cloudwatch_exporter_import_error( - self, mock_get_session, mock_is_lambda, mock_fetch_headers - ): - """Test _create_emf_exporter handles import errors for CloudWatch exporter""" - # Setup mocks - mock_is_lambda.return_value = False - mock_session = MagicMock() - mock_get_session.return_value = mock_session - - mock_header_setting = MagicMock() - mock_header_setting.is_valid.return_value = True - mock_fetch_headers.return_value = mock_header_setting - - # Mock import to raise ImportError - with patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") as mock_logger: - with patch("builtins.__import__", side_effect=ImportError("Cannot import CloudWatch exporter")): - result = _create_emf_exporter() - - self.assertIsNone(result) - mock_logger.error.assert_called_once() - def validate_distro_environ(): tc: TestCase = TestCase() diff --git a/lambda-layer/src/opentelemetry/instrumentation/aws_lambda/__init__.py b/lambda-layer/src/opentelemetry/instrumentation/aws_lambda/__init__.py index 23f005f6a..246652956 100644 --- a/lambda-layer/src/opentelemetry/instrumentation/aws_lambda/__init__.py +++ b/lambda-layer/src/opentelemetry/instrumentation/aws_lambda/__init__.py @@ -46,7 +46,6 @@ def lambda_handler(event, context): tracer_provider (TracerProvider) - an optional tracer provider meter_provider (MeterProvider) - an optional meter provider -logger_provider (LoggerProvider) - an optional logger provider event_context_extractor (Callable) - a function that returns an OTel Trace Context given the Lambda Event the AWS Lambda was invoked with this function signature is: def event_context_extractor(lambda_event: Any) -> Context @@ -78,7 +77,6 @@ def custom_event_context_extractor(lambda_event): from wrapt import wrap_function_wrapper from opentelemetry import context as context_api -from opentelemetry._logs import LoggerProvider, get_logger_provider from opentelemetry.context.context import Context from opentelemetry.instrumentation.aws_lambda.package import _instruments from opentelemetry.instrumentation.aws_lambda.version import __version__ @@ -96,7 +94,9 @@ def custom_event_context_extractor(lambda_event): _HANDLER = "_HANDLER" _X_AMZN_TRACE_ID = "_X_AMZN_TRACE_ID" ORIG_HANDLER = "ORIG_HANDLER" -OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT = "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT" +OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT = ( + "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT" +) def _default_event_context_extractor(lambda_event: Any) -> Context: @@ -157,13 +157,17 @@ def _determine_parent_context( return event_context_extractor(lambda_event) -def _set_api_gateway_v1_proxy_attributes(lambda_event: Any, span: Span) -> Span: +def _set_api_gateway_v1_proxy_attributes( + lambda_event: Any, span: Span +) -> Span: """Sets HTTP attributes for REST APIs and v1 HTTP APIs More info: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format """ - span.set_attribute(SpanAttributes.HTTP_METHOD, lambda_event.get("httpMethod")) + span.set_attribute( + SpanAttributes.HTTP_METHOD, lambda_event.get("httpMethod") + ) if lambda_event.get("headers"): if "User-Agent" in lambda_event["headers"]: @@ -190,12 +194,16 @@ def _set_api_gateway_v1_proxy_attributes(lambda_event: Any, span: Span) -> Span: f"{lambda_event['resource']}?{urlencode(lambda_event['queryStringParameters'])}", ) else: - span.set_attribute(SpanAttributes.HTTP_TARGET, lambda_event["resource"]) + span.set_attribute( + SpanAttributes.HTTP_TARGET, lambda_event["resource"] + ) return span -def _set_api_gateway_v2_proxy_attributes(lambda_event: Any, span: Span) -> Span: +def _set_api_gateway_v2_proxy_attributes( + lambda_event: Any, span: Span +) -> Span: """Sets HTTP attributes for v2 HTTP APIs More info: @@ -245,7 +253,6 @@ def _instrument( event_context_extractor: Callable[[Any], Context], tracer_provider: TracerProvider = None, meter_provider: MeterProvider = None, - logger_provider: LoggerProvider = None, ): # pylint: disable=too-many-locals @@ -254,7 +261,9 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches call_wrapped, instance, args, kwargs ): - orig_handler_name = ".".join([wrapped_module_name, wrapped_function_name]) + orig_handler_name = ".".join( + [wrapped_module_name, wrapped_function_name] + ) lambda_event = args[0] @@ -297,7 +306,9 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches # # See more: # https://github.com/open-telemetry/semantic-conventions/blob/main/docs/faas/aws-lambda.md#all-triggers - account_id = lambda_context.invoked_function_arn.split(":")[4] + account_id = lambda_context.invoked_function_arn.split( + ":" + )[4] span.set_attribute( ResourceAttributes.CLOUD_ACCOUNT_ID, account_id, @@ -315,13 +326,19 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches # If the request came from an API Gateway, extract http attributes from the event # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/instrumentation/aws-lambda.md#api-gateway # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-server-semantic-conventions - if isinstance(lambda_event, dict) and lambda_event.get("requestContext"): + if isinstance(lambda_event, dict) and lambda_event.get( + "requestContext" + ): span.set_attribute(SpanAttributes.FAAS_TRIGGER, "http") if lambda_event.get("version") == "2.0": - _set_api_gateway_v2_proxy_attributes(lambda_event, span) + _set_api_gateway_v2_proxy_attributes( + lambda_event, span + ) else: - _set_api_gateway_v1_proxy_attributes(lambda_event, span) + _set_api_gateway_v1_proxy_attributes( + lambda_event, span + ) if isinstance(result, dict) and result.get("statusCode"): span.set_attribute( @@ -360,22 +377,6 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches " case of a Lambda freeze and would exist in the OTel SDK implementation." ) - _logger_provider = logger_provider or get_logger_provider() - if hasattr(_logger_provider, "force_flush"): - rem = flush_timeout - (time.time() - now) * 1000 - if rem > 0: - try: - # NOTE: `force_flush` before function quit in case of Lambda freeze. - _logger_provider.force_flush(rem) - except Exception: # pylint: disable=broad-except - logger.exception("LoggerProvider failed to flush logs") - else: - logger.debug( - "LoggerProvider (%s) was missing `force_flush` method. This is necessary in" - " case of a Lambda freeze and would exist in the OTel SDK implementation.", - _logger_provider.__class__.__name__, - ) - if exception is not None: raise exception.with_traceback(exception.__traceback__) @@ -402,7 +403,6 @@ def _instrument(self, **kwargs): **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global ``meter_provider``: a MeterProvider, defaults to global - ``logger_provider``: a LoggerProvider, defaults to global ``event_context_extractor``: a method which takes the Lambda Event as input and extracts an OTel Context from it. By default, the context is extracted from the HTTP headers of an API Gateway @@ -423,7 +423,9 @@ def _instrument(self, **kwargs): self._wrapped_function_name, ) = lambda_handler.rsplit(".", 1) - flush_timeout_env = os.environ.get(OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT, None) + flush_timeout_env = os.environ.get( + OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT, None + ) flush_timeout = 30000 try: if flush_timeout_env is not None: @@ -438,10 +440,11 @@ def _instrument(self, **kwargs): self._wrapped_module_name, self._wrapped_function_name, flush_timeout, - event_context_extractor=kwargs.get("event_context_extractor", _default_event_context_extractor), + event_context_extractor=kwargs.get( + "event_context_extractor", _default_event_context_extractor + ), tracer_provider=kwargs.get("tracer_provider"), meter_provider=kwargs.get("meter_provider"), - logger_provider=kwargs.get("logger_provider"), ) def _uninstrument(self, **kwargs): diff --git a/lambda-layer/src/otel-instrument b/lambda-layer/src/otel-instrument index 65a9edc2e..6965ba12d 100755 --- a/lambda-layer/src/otel-instrument +++ b/lambda-layer/src/otel-instrument @@ -98,19 +98,6 @@ if [ -z ${OTEL_PROPAGATORS} ]; then export OTEL_PROPAGATORS="baggage,xray,tracecontext" fi -# disable application signals runtime metrics by default -export OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED="false"; - -# disable otel metrics export by default -if [ -z "${OTEL_METRICS_EXPORTER}" ]; then - export OTEL_METRICS_EXPORTER="none"; -fi - -# disable OTel logs exporter by default -if [ -z "${OTEL_LOGS_EXPORTER}" ]; then - export OTEL_LOGS_EXPORTER="none"; -fi - if [ -z "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" ]; then export OTEL_AWS_APPLICATION_SIGNALS_ENABLED="true"; fi @@ -120,6 +107,17 @@ fi if [ "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" = "true" ]; then export OTEL_PYTHON_DISTRO="aws_distro"; export OTEL_PYTHON_CONFIGURATOR="aws_configurator"; + if [ -z "${OTEL_METRICS_EXPORTER}" ]; then + export OTEL_METRICS_EXPORTER="none"; + fi +fi + +# - If Application Signals is disabled + +if [ "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" = "false" ]; then + if [ -z "${OTEL_METRICS_EXPORTER}" ]; then + export OTEL_METRICS_EXPORTER="none"; + fi fi if [ -z "${OTEL_RESOURCE_ATTRIBUTES}" ]; then