diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py new file mode 100644 index 000000000..e2e364b03 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py @@ -0,0 +1,523 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=no-self-use + +import json +import logging +import time +import uuid +from collections import defaultdict +from typing import Any, Dict, List, Optional, Tuple + +import botocore.session +from botocore.exceptions import ClientError + +from opentelemetry.sdk.metrics import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk.metrics._internal.point import Metric +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + Gauge, + MetricExporter, + MetricExportResult, + MetricsData, + NumberDataPoint, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.util.types import Attributes + +logger = logging.getLogger(__name__) + + +class MetricRecord: + """The metric data unified representation of all OTel metrics for OTel to CW EMF conversion.""" + + def __init__(self, metric_name: str, metric_unit: str, metric_description: str): + """ + Initialize metric record. + + Args: + metric_name: Name of the metric + metric_unit: Unit of the metric + metric_description: Description of the metric + """ + # Instrument metadata + self.name = metric_name + self.unit = metric_unit + self.description = metric_description + + # Will be set by conversion methods + self.timestamp: Optional[int] = None + self.attributes: Attributes = {} + + # Different metric type data - only one will be set per record + self.value: Optional[float] = None + self.sum_data: Optional[Any] = None + self.histogram_data: Optional[Any] = None + self.exp_histogram_data: Optional[Any] = None + + +class AwsCloudWatchEmfExporter(MetricExporter): + """ + OpenTelemetry metrics exporter for CloudWatch EMF format. + + This exporter converts OTel metrics into CloudWatch EMF logs which are then + sent to CloudWatch Logs. CloudWatch Logs automatically extracts the metrics + from the EMF logs. + + https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html + + """ + + # CloudWatch EMF supported units + # Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html + EMF_SUPPORTED_UNITS = { + "Seconds", + "Microseconds", + "Milliseconds", + "Bytes", + "Kilobytes", + "Megabytes", + "Gigabytes", + "Terabytes", + "Bits", + "Kilobits", + "Megabits", + "Gigabits", + "Terabits", + "Percent", + "Count", + "Bytes/Second", + "Kilobytes/Second", + "Megabytes/Second", + "Gigabytes/Second", + "Terabytes/Second", + "Bits/Second", + "Kilobits/Second", + "Megabits/Second", + "Gigabits/Second", + "Terabits/Second", + "Count/Second", + "None", + } + + # OTel to CloudWatch unit mapping + # Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188 + UNIT_MAPPING = { + "1": "", + "ns": "", + "ms": "Milliseconds", + "s": "Seconds", + "us": "Microseconds", + "By": "Bytes", + "bit": "Bits", + } + + def __init__( + self, + namespace: str = "default", + log_group_name: str = None, + log_stream_name: Optional[str] = None, + aws_region: Optional[str] = None, + preferred_temporality: Optional[Dict[type, AggregationTemporality]] = None, + **kwargs, + ): + """ + Initialize the CloudWatch EMF exporter. + + Args: + namespace: CloudWatch namespace for metrics + log_group_name: CloudWatch log group name + log_stream_name: CloudWatch log stream name (auto-generated if None) + aws_region: AWS region (auto-detected if None) + preferred_temporality: Optional dictionary mapping instrument types to aggregation temporality + **kwargs: Additional arguments passed to botocore client + """ + # Set up temporality preference default to DELTA if customers not set + if preferred_temporality is None: + preferred_temporality = { + Counter: AggregationTemporality.DELTA, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableGauge: AggregationTemporality.DELTA, + ObservableUpDownCounter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.DELTA, + } + + super().__init__(preferred_temporality) + + self.namespace = namespace + self.log_group_name = log_group_name + self.log_stream_name = log_stream_name or self._generate_log_stream_name() + + session = botocore.session.Session() + self.logs_client = session.create_client("logs", region_name=aws_region, **kwargs) + + # Ensure log group exists + self._ensure_log_group_exists() + + # Ensure log stream exists + self._ensure_log_stream_exists() + + # Default to unique log stream name matching OTel Collector + # EMF Exporter behavior with language for source identification + def _generate_log_stream_name(self) -> str: + """Generate a unique log stream name.""" + + unique_id = str(uuid.uuid4())[:8] + return f"otel-python-{unique_id}" + + def _ensure_log_group_exists(self): + """Ensure the log group exists, create if it doesn't.""" + try: + self.logs_client.create_log_group(logGroupName=self.log_group_name) + logger.info("Created log group: %s", self.log_group_name) + except ClientError as error: + if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException": + logger.debug("Log group %s already exists", self.log_group_name) + else: + logger.error("Failed to create log group %s : %s", self.log_group_name, error) + raise + + def _ensure_log_stream_exists(self): + try: + self.logs_client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name) + logger.info("Created log stream: %s", self.log_stream_name) + except ClientError as error: + if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException": + logger.debug("Log stream %s already exists", self.log_stream_name) + else: + logger.error("Failed to create log stream %s : %s", self.log_group_name, error) + raise + + def _get_metric_name(self, record: MetricRecord) -> Optional[str]: + """Get the metric name from the metric record or data point.""" + + try: + if record.name: + return record.name + except AttributeError: + pass + # Return None if no valid metric name found + return None + + def _get_unit(self, record: MetricRecord) -> Optional[str]: + """Get CloudWatch unit from MetricRecord unit.""" + unit = record.unit + + if not unit: + return None + + # First check if unit is already a supported EMF unit + if unit in self.EMF_SUPPORTED_UNITS: + return unit + + # Map from OTel unit to CloudWatch unit + mapped_unit = self.UNIT_MAPPING.get(unit) + + return mapped_unit + + def _get_dimension_names(self, attributes: Attributes) -> List[str]: + """Extract dimension names from attributes.""" + # Implement dimension selection logic + # For now, use all attributes as dimensions + return list(attributes.keys()) + + def _get_attributes_key(self, attributes: Attributes) -> str: + """ + Create a hashable key from attributes for grouping metrics. + + Args: + attributes: The attributes dictionary + + Returns: + A string representation of sorted attributes key-value pairs + """ + # Sort the attributes to ensure consistent keys + sorted_attrs = sorted(attributes.items()) + # Create a string representation of the attributes + return str(sorted_attrs) + + def _normalize_timestamp(self, timestamp_ns: int) -> int: + """ + Normalize a nanosecond timestamp to milliseconds for CloudWatch. + + Args: + timestamp_ns: Timestamp in nanoseconds + + Returns: + Timestamp in milliseconds + """ + # Convert from nanoseconds to milliseconds + return timestamp_ns // 1_000_000 + + def _create_metric_record(self, metric_name: str, metric_unit: str, metric_description: str) -> MetricRecord: + """ + Creates the intermediate metric data structure that standardizes different otel metric representation + and will be used to generate EMF events. The base record + establishes the instrument schema (name/unit/description) that will be populated + with dimensions, timestamps, and values during metric processing. + + Args: + metric_name: Name of the metric + metric_unit: Unit of the metric + metric_description: Description of the metric + + Returns: + A MetricRecord object + """ + return MetricRecord(metric_name, metric_unit, metric_description) + + def _convert_gauge(self, metric: Metric, data_point: NumberDataPoint) -> MetricRecord: + """Convert a Gauge metric datapoint to a metric record. + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and value + """ + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + try: + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + except AttributeError: + # data_point doesn't have time_unix_nano attribute + timestamp_ms = int(time.time() * 1000) + record.timestamp = timestamp_ms + + # Set attributes + try: + record.attributes = data_point.attributes + except AttributeError: + # data_point doesn't have attributes + record.attributes = {} + + # For Gauge, set the value directly + try: + record.value = data_point.value + except AttributeError: + # data_point doesn't have value + record.value = None + + return record + + def _group_by_attributes_and_timestamp(self, record: MetricRecord) -> Tuple[str, int]: + """Group metric record by attributes and timestamp. + + Args: + record: The metric record + + Returns: + A tuple key for grouping + """ + # Create a key for grouping based on attributes + attrs_key = self._get_attributes_key(record.attributes) + return (attrs_key, record.timestamp) + + def _create_emf_log( + self, metric_records: List[MetricRecord], resource: Resource, timestamp: Optional[int] = None + ) -> Dict: + """ + Create EMF log dictionary from metric records. + + Since metric_records is already grouped by attributes, this function + creates a single EMF log for all records. + """ + # Start with base structure + emf_log = {"_aws": {"Timestamp": timestamp or int(time.time() * 1000), "CloudWatchMetrics": []}} + + # Set with latest EMF version schema + # opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/metric_translator.go#L414 + emf_log["Version"] = "1" + + # Add resource attributes to EMF log but not as dimensions + # OTel collector EMF Exporter has a resource_to_telemetry_conversion flag that will convert resource attributes + # as regular metric attributes(potential dimensions). However, for this SDK EMF implementation, + # we align with the OpenTelemetry concept that all metric attributes are treated as dimensions. + # And have resource attributes as just additional metadata in EMF, added otel.resource as prefix to distinguish. + if resource and resource.attributes: + for key, value in resource.attributes.items(): + emf_log[f"otel.resource.{key}"] = str(value) + + # Initialize collections for dimensions and metrics + metric_definitions = [] + # Collect attributes from all records (they should be the same for all records in the group) + # Only collect once from the first record and apply to all records + all_attributes = metric_records[0].attributes if metric_records and metric_records[0].attributes else {} + + # Process each metric record + for record in metric_records: + + metric_name = self._get_metric_name(record) + + # Skip processing if metric name is None or empty + if not metric_name: + continue + + # Skip processing if metric value is None or empty + if record.value is None: + logger.debug("Skipping metric %s as it does not have valid metric value", metric_name) + continue + + # Create metric data dict + metric_data = {"Name": metric_name} + + unit = self._get_unit(record) + if unit: + metric_data["Unit"] = unit + + # Add to metric definitions list + metric_definitions.append(metric_data) + + emf_log[metric_name] = record.value + + # Get dimension names from collected attributes + dimension_names = self._get_dimension_names(all_attributes) + + # Add attribute values to the root of the EMF log + for name, value in all_attributes.items(): + emf_log[name] = str(value) + + # Add the single dimension set to CloudWatch Metrics if we have dimensions and metrics + if dimension_names and metric_definitions: + emf_log["_aws"]["CloudWatchMetrics"].append( + {"Namespace": self.namespace, "Dimensions": [dimension_names], "Metrics": metric_definitions} + ) + + return emf_log + + # pylint: disable=no-member + def _send_log_event(self, log_event: Dict[str, Any]): + """ + Send a log event to CloudWatch Logs. + + Basic implementation for PR 1 - sends individual events directly. + + TODO: Batching event and follow CloudWatch Logs quato constraints - number of events & size limit per payload + """ + try: + # Send the log event + response = self.logs_client.put_log_events( + logGroupName=self.log_group_name, logStreamName=self.log_stream_name, logEvents=[log_event] + ) + + logger.debug("Successfully sent log event") + return response + + except ClientError as error: + logger.debug("Failed to send log event: %s", error) + raise + + # pylint: disable=too-many-nested-blocks + def export( + self, metrics_data: MetricsData, timeout_millis: Optional[int] = None, **kwargs: Any + ) -> MetricExportResult: + """ + Export metrics as EMF logs to CloudWatch. + + Groups metrics by attributes and timestamp before creating EMF logs. + + Args: + metrics_data: MetricsData containing resource metrics and scope metrics + timeout_millis: Optional timeout in milliseconds + **kwargs: Additional keyword arguments + + Returns: + MetricExportResult indicating success or failure + """ + try: + if not metrics_data.resource_metrics: + return MetricExportResult.SUCCESS + + # Process all metrics from all resource metrics and scope metrics + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + # Dictionary to group metrics by attributes and timestamp + grouped_metrics = defaultdict(list) + + # Process all metrics in this scope + for metric in scope_metrics.metrics: + # Skip if metric.data is None or no data_points exists + try: + if not (metric.data and metric.data.data_points): + continue + except AttributeError: + # Metric doesn't have data or data_points attribute + continue + + # Process metrics based on type + metric_type = type(metric.data) + if metric_type == Gauge: + for dp in metric.data.data_points: + record = self._convert_gauge(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + else: + logger.debug("Unsupported Metric Type: %s", metric_type) + + # Now process each group separately to create one EMF log per group + for (_, timestamp_ms), metric_records in grouped_metrics.items(): + if not metric_records: + continue + + # Create and send EMF log for this batch of metrics + self._send_log_event( + { + "message": json.dumps( + self._create_emf_log(metric_records, resource_metrics.resource, timestamp_ms) + ), + "timestamp": timestamp_ms, + } + ) + + return MetricExportResult.SUCCESS + # pylint: disable=broad-exception-caught + # capture all types of exceptions to not interrupt the instrumented services + except Exception as error: + logger.error("Failed to export metrics: %s", error) + return MetricExportResult.FAILURE + + def force_flush(self, timeout_millis: int = 10000) -> bool: + """ + Force flush any pending metrics. + + TODO: will add logic to handle gracefule shutdown + + Args: + timeout_millis: Timeout in milliseconds + + Returns: + True if successful, False otherwise + """ + logger.debug("AwsCloudWatchEmfExporter force flushes the buffered metrics") + return True + + def shutdown(self, timeout_millis: Optional[int] = None, **kwargs: Any) -> bool: + """ + Shutdown the exporter. + Override to handle timeout and other keyword arguments, but do nothing. + + TODO: will add logic to handle gracefule shutdown + + Args: + timeout_millis: Ignored timeout in milliseconds + **kwargs: Ignored additional keyword arguments + """ + # Intentionally do nothing + self.force_flush(timeout_millis) + logger.debug("AwsCloudWatchEmfExporter shutdown called with timeout_millis=%s", timeout_millis) + return True diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/metrics/test_aws_cloudwatch_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/metrics/test_aws_cloudwatch_emf_exporter.py new file mode 100644 index 000000000..01d500c70 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/metrics/test_aws_cloudwatch_emf_exporter.py @@ -0,0 +1,547 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import json +import time +import unittest +from unittest.mock import Mock, patch + +from botocore.exceptions import ClientError + +from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import AwsCloudWatchEmfExporter +from opentelemetry.sdk.metrics.export import Gauge, MetricExportResult +from opentelemetry.sdk.resources import Resource + + +class MockDataPoint: + """Mock datapoint for testing.""" + + def __init__(self, value=10.0, attributes=None, time_unix_nano=None): + self.value = value + self.attributes = attributes or {} + self.time_unix_nano = time_unix_nano or int(time.time() * 1_000_000_000) + + +class MockMetric: + """Mock metric for testing.""" + + def __init__(self, name="test_metric", unit="1", description="Test metric"): + self.name = name + self.unit = unit + self.description = description + + +class MockGaugeData: + """Mock gauge data that passes isinstance checks.""" + + def __init__(self, data_points=None): + self.data_points = data_points or [] + + +class MockMetricWithData: + """Mock metric with data attribute.""" + + def __init__(self, name="test_metric", unit="1", description="Test metric", data=None): + self.name = name + self.unit = unit + self.description = description + self.data = data or MockGaugeData() + + +class MockResourceMetrics: + """Mock resource metrics for testing.""" + + def __init__(self, resource=None, scope_metrics=None): + self.resource = resource or Resource.create({"service.name": "test-service"}) + self.scope_metrics = scope_metrics or [] + + +class MockScopeMetrics: + """Mock scope metrics for testing.""" + + def __init__(self, scope=None, metrics=None): + self.scope = scope or Mock() + self.metrics = metrics or [] + + +# pylint: disable=too-many-public-methods +class TestAwsCloudWatchEmfExporter(unittest.TestCase): + """Test AwsCloudWatchEmfExporter class.""" + + def setUp(self): + """Set up test fixtures.""" + # Mock the botocore session to avoid AWS calls + with patch("botocore.session.Session") as mock_session: + mock_client = Mock() + mock_session_instance = Mock() + mock_session.return_value = mock_session_instance + mock_session_instance.create_client.return_value = mock_client + mock_client.create_log_group.return_value = {} + mock_client.create_log_stream.return_value = {} + + self.exporter = AwsCloudWatchEmfExporter(namespace="TestNamespace", log_group_name="test-log-group") + + def test_initialization(self): + """Test exporter initialization.""" + self.assertEqual(self.exporter.namespace, "TestNamespace") + self.assertIsNotNone(self.exporter.log_stream_name) + self.assertEqual(self.exporter.log_group_name, "test-log-group") + + @patch("botocore.session.Session") + def test_initialization_with_custom_params(self, mock_session): + """Test exporter initialization with custom parameters.""" + # Mock the botocore session to avoid AWS calls + mock_client = Mock() + mock_session_instance = Mock() + mock_session.return_value = mock_session_instance + mock_session_instance.create_client.return_value = mock_client + mock_client.create_log_group.return_value = {} + mock_client.create_log_stream.return_value = {} + + exporter = AwsCloudWatchEmfExporter( + namespace="CustomNamespace", + log_group_name="custom-log-group", + log_stream_name="custom-stream", + aws_region="us-west-2", + ) + self.assertEqual(exporter.namespace, "CustomNamespace") + self.assertEqual(exporter.log_group_name, "custom-log-group") + self.assertEqual(exporter.log_stream_name, "custom-stream") + + def test_get_unit_mapping(self): + """Test unit mapping functionality.""" + # Test known units from UNIT_MAPPING + self.assertEqual( + self.exporter._get_unit(self.exporter._create_metric_record("test", "ms", "test")), "Milliseconds" + ) + self.assertEqual(self.exporter._get_unit(self.exporter._create_metric_record("test", "s", "test")), "Seconds") + self.assertEqual( + self.exporter._get_unit(self.exporter._create_metric_record("test", "us", "test")), "Microseconds" + ) + self.assertEqual(self.exporter._get_unit(self.exporter._create_metric_record("test", "By", "test")), "Bytes") + self.assertEqual(self.exporter._get_unit(self.exporter._create_metric_record("test", "bit", "test")), "Bits") + + # Test units that map to empty string (should return empty string from mapping) + self.assertEqual(self.exporter._get_unit(self.exporter._create_metric_record("test", "1", "test")), "") + self.assertEqual(self.exporter._get_unit(self.exporter._create_metric_record("test", "ns", "test")), "") + + # Test EMF supported units directly (should return as-is) + self.assertEqual(self.exporter._get_unit(self.exporter._create_metric_record("test", "Count", "test")), "Count") + self.assertEqual( + self.exporter._get_unit(self.exporter._create_metric_record("test", "Percent", "test")), "Percent" + ) + self.assertEqual( + self.exporter._get_unit(self.exporter._create_metric_record("test", "Kilobytes", "test")), "Kilobytes" + ) + + # Test unknown unit (not in mapping and not in supported units, returns None) + self.assertIsNone(self.exporter._get_unit(self.exporter._create_metric_record("test", "unknown", "test"))) + + # Test empty unit (should return None due to falsy check) + self.assertIsNone(self.exporter._get_unit(self.exporter._create_metric_record("test", "", "test"))) + + # Test None unit + self.assertIsNone(self.exporter._get_unit(self.exporter._create_metric_record("test", None, "test"))) + + def test_get_metric_name(self): + """Test metric name extraction.""" + # Test with record that has name attribute + record = Mock() + record.name = "test_metric" + + result = self.exporter._get_metric_name(record) + self.assertEqual(result, "test_metric") + + # Test with record that has empty name (should return None) + record_empty = Mock() + record_empty.name = "" + + result_empty = self.exporter._get_metric_name(record_empty) + self.assertIsNone(result_empty) + + def test_get_dimension_names(self): + """Test dimension names extraction.""" + attributes = {"service.name": "test-service", "env": "prod", "region": "us-east-1"} + + result = self.exporter._get_dimension_names(attributes) + + # Should return all attribute keys + self.assertEqual(set(result), {"service.name", "env", "region"}) + + def test_get_attributes_key(self): + """Test attributes key generation.""" + attributes = {"service": "test", "env": "prod"} + + result = self.exporter._get_attributes_key(attributes) + + # Should be a string representation of sorted attributes + self.assertIsInstance(result, str) + self.assertIn("service", result) + self.assertIn("test", result) + self.assertIn("env", result) + self.assertIn("prod", result) + + def test_get_attributes_key_consistent(self): + """Test that attributes key generation is consistent.""" + # Same attributes in different order should produce same key + attrs1 = {"b": "2", "a": "1"} + attrs2 = {"a": "1", "b": "2"} + + key1 = self.exporter._get_attributes_key(attrs1) + key2 = self.exporter._get_attributes_key(attrs2) + + self.assertEqual(key1, key2) + + def test_group_by_attributes_and_timestamp(self): + """Test grouping by attributes and timestamp.""" + record = Mock() + record.attributes = {"env": "test"} + record.timestamp = 1234567890 + + result = self.exporter._group_by_attributes_and_timestamp(record) + + # Should return a tuple with attributes key and timestamp + self.assertIsInstance(result, tuple) + self.assertEqual(len(result), 2) + self.assertEqual(result[1], 1234567890) + + def test_generate_log_stream_name(self): + """Test log stream name generation.""" + name1 = self.exporter._generate_log_stream_name() + name2 = self.exporter._generate_log_stream_name() + + # Should generate unique names + self.assertNotEqual(name1, name2) + self.assertTrue(name1.startswith("otel-python-")) + self.assertTrue(name2.startswith("otel-python-")) + + def test_normalize_timestamp(self): + """Test timestamp normalization.""" + timestamp_ns = 1609459200000000000 # 2021-01-01 00:00:00 in nanoseconds + expected_ms = 1609459200000 # Same time in milliseconds + + result = self.exporter._normalize_timestamp(timestamp_ns) + self.assertEqual(result, expected_ms) + + def test_create_metric_record(self): + """Test metric record creation.""" + record = self.exporter._create_metric_record("test_metric", "Count", "Test description") + + self.assertIsNotNone(record) + self.assertEqual(record.name, "test_metric") + self.assertEqual(record.unit, "Count") + self.assertEqual(record.description, "Test description") + + def test_convert_gauge(self): + """Test gauge conversion.""" + metric = MockMetric("gauge_metric", "Count", "Gauge description") + dp = MockDataPoint(value=42.5, attributes={"key": "value"}) + + record = self.exporter._convert_gauge(metric, dp) + + self.assertIsNotNone(record) + self.assertEqual(record.name, "gauge_metric") + self.assertEqual(record.value, 42.5) + self.assertEqual(record.attributes, {"key": "value"}) + self.assertIsInstance(record.timestamp, int) + + def test_create_emf_log(self): + """Test EMF log creation.""" + # Create test records + gauge_record = self.exporter._create_metric_record("gauge_metric", "Count", "Gauge") + gauge_record.value = 50.0 + gauge_record.timestamp = int(time.time() * 1000) + gauge_record.attributes = {"env": "test"} + + records = [gauge_record] + resource = Resource.create({"service.name": "test-service"}) + + result = self.exporter._create_emf_log(records, resource) + + self.assertIsInstance(result, dict) + + # Check that the result is JSON serializable + json.dumps(result) # Should not raise exception + + @patch("botocore.session.Session") + def test_export_success(self, mock_session): + """Test successful export.""" + # Mock CloudWatch Logs client + mock_client = Mock() + mock_session_instance = Mock() + mock_session.return_value = mock_session_instance + mock_session_instance.create_client.return_value = mock_client + mock_client.put_log_events.return_value = {"nextSequenceToken": "12345"} + + # Create empty metrics data to test basic export flow + metrics_data = Mock() + metrics_data.resource_metrics = [] + + result = self.exporter.export(metrics_data) + + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_export_failure(self): + """Test export failure handling.""" + # Create metrics data that will cause an exception during iteration + metrics_data = Mock() + # Make resource_metrics raise an exception when iterated over + metrics_data.resource_metrics = Mock() + metrics_data.resource_metrics.__iter__ = Mock(side_effect=Exception("Test exception")) + + result = self.exporter.export(metrics_data) + + self.assertEqual(result, MetricExportResult.FAILURE) + + def test_force_flush_no_pending_events(self): + """Test force flush functionality with no pending events.""" + result = self.exporter.force_flush() + + self.assertTrue(result) + + @patch.object(AwsCloudWatchEmfExporter, "force_flush") + def test_shutdown(self, mock_force_flush): + """Test shutdown functionality.""" + mock_force_flush.return_value = True + + result = self.exporter.shutdown(timeout_millis=5000) + + self.assertTrue(result) + mock_force_flush.assert_called_once_with(5000) + + def test_send_log_event_method_exists(self): + """Test that _send_log_event method exists and can be called.""" + # Just test that the method exists and doesn't crash with basic input + log_event = {"message": "test message", "timestamp": 1234567890} + + # Mock the AWS client methods to avoid actual AWS calls + with patch.object(self.exporter.logs_client, "create_log_group"): + with patch.object(self.exporter.logs_client, "create_log_stream"): + with patch.object(self.exporter.logs_client, "put_log_events") as mock_put: + mock_put.return_value = {"nextSequenceToken": "12345"} + + # Should not raise an exception + try: + response = self.exporter._send_log_event(log_event) + # Response may be None or a dict, both are acceptable + self.assertTrue(response is None or isinstance(response, dict)) + except ClientError as error: + self.fail(f"_send_log_event raised an exception: {error}") + + def test_create_emf_log_with_resource(self): + """Test EMF log creation with resource attributes.""" + # Create test records + gauge_record = self.exporter._create_metric_record("gauge_metric", "Count", "Gauge") + gauge_record.value = 50.0 + gauge_record.timestamp = int(time.time() * 1000) + gauge_record.attributes = {"env": "test", "service": "api"} + + records = [gauge_record] + resource = Resource.create({"service.name": "test-service", "service.version": "1.0.0"}) + + result = self.exporter._create_emf_log(records, resource, 1234567890) + + # Verify EMF log structure + self.assertIn("_aws", result) + self.assertIn("CloudWatchMetrics", result["_aws"]) + self.assertEqual(result["_aws"]["Timestamp"], 1234567890) + self.assertEqual(result["Version"], "1") + + # Check resource attributes are prefixed + self.assertEqual(result["otel.resource.service.name"], "test-service") + self.assertEqual(result["otel.resource.service.version"], "1.0.0") + + # Check metric attributes + self.assertEqual(result["env"], "test") + self.assertEqual(result["service"], "api") + + # Check metric value + self.assertEqual(result["gauge_metric"], 50.0) + + # Check CloudWatch metrics structure + cw_metrics = result["_aws"]["CloudWatchMetrics"][0] + self.assertEqual(cw_metrics["Namespace"], "TestNamespace") + self.assertEqual(set(cw_metrics["Dimensions"][0]), {"env", "service"}) + self.assertEqual(cw_metrics["Metrics"][0]["Name"], "gauge_metric") + + @patch("botocore.session.Session") + def test_export_with_gauge_metrics(self, mock_session): + """Test exporting actual gauge metrics.""" + # Mock CloudWatch Logs client + mock_client = Mock() + mock_session_instance = Mock() + mock_session.return_value = mock_session_instance + mock_session_instance.create_client.return_value = mock_client + mock_client.put_log_events.return_value = {"nextSequenceToken": "12345"} + mock_client.create_log_group.side_effect = ClientError( + {"Error": {"Code": "ResourceAlreadyExistsException"}}, "CreateLogGroup" + ) + mock_client.create_log_stream.side_effect = ClientError( + {"Error": {"Code": "ResourceAlreadyExistsException"}}, "CreateLogStream" + ) + + # Create mock metrics data + resource = Resource.create({"service.name": "test-service"}) + + # Create gauge data + gauge_data = Gauge(data_points=[MockDataPoint(value=42.0, attributes={"key": "value"})]) + + metric = MockMetricWithData(name="test_gauge", data=gauge_data) + + scope_metrics = MockScopeMetrics(metrics=[metric]) + resource_metrics = MockResourceMetrics(resource=resource, scope_metrics=[scope_metrics]) + + metrics_data = Mock() + metrics_data.resource_metrics = [resource_metrics] + + result = self.exporter.export(metrics_data) + + self.assertEqual(result, MetricExportResult.SUCCESS) + # Test validates that export works with gauge metrics + + def test_get_metric_name_fallback(self): + """Test metric name extraction fallback.""" + # Test with record that has no instrument attribute + record = Mock(spec=[]) + + result = self.exporter._get_metric_name(record) + self.assertIsNone(result) + + def test_get_metric_name_empty_name(self): + """Test metric name extraction with empty name.""" + # Test with record that has empty name + record = Mock() + record.name = "" + + result = self.exporter._get_metric_name(record) + self.assertIsNone(result) + + def test_create_emf_log_skips_empty_metric_names(self): + """Test that EMF log creation skips records with empty metric names.""" + # Create a record with no metric name + record_without_name = Mock() + record_without_name.attributes = {"key": "value"} + record_without_name.value = 10.0 + record_without_name.name = None # No valid name + + # Create a record with valid metric name + valid_record = self.exporter._create_metric_record("valid_metric", "Count", "Valid metric") + valid_record.value = 20.0 + valid_record.attributes = {"key": "value"} + + records = [record_without_name, valid_record] + resource = Resource.create({"service.name": "test-service"}) + + result = self.exporter._create_emf_log(records, resource, 1234567890) + + # Only the valid record should be processed + self.assertIn("valid_metric", result) + self.assertEqual(result["valid_metric"], 20.0) + + # Check that only the valid metric is in the definitions (empty names are skipped) + cw_metrics = result["_aws"]["CloudWatchMetrics"][0] + self.assertEqual(len(cw_metrics["Metrics"]), 1) + # Ensure our valid metric is present + metric_names = [m["Name"] for m in cw_metrics["Metrics"]] + self.assertIn("valid_metric", metric_names) + + @patch("os.environ.get") + @patch("botocore.session.Session") + def test_initialization_with_env_region(self, mock_session, mock_env_get): + """Test initialization with AWS region from environment.""" + # Mock environment variable + mock_env_get.side_effect = lambda key: "us-west-1" if key == "AWS_REGION" else None + + # Mock the botocore session to avoid AWS calls + mock_client = Mock() + mock_session_instance = Mock() + mock_session.return_value = mock_session_instance + mock_session_instance.create_client.return_value = mock_client + mock_client.create_log_group.return_value = {} + mock_client.create_log_stream.return_value = {} + + exporter = AwsCloudWatchEmfExporter(namespace="TestNamespace", log_group_name="test-log-group") + + # Just verify the exporter was created successfully with region handling + self.assertIsNotNone(exporter) + self.assertEqual(exporter.namespace, "TestNamespace") + + @patch("botocore.session.Session") + def test_ensure_log_group_exists_create_failure(self, mock_session): + """Test log group creation failure.""" + # Mock the botocore session + mock_client = Mock() + mock_session_instance = Mock() + mock_session.return_value = mock_session_instance + mock_session_instance.create_client.return_value = mock_client + + # Make create fail with access denied error + mock_client.create_log_group.side_effect = ClientError({"Error": {"Code": "AccessDenied"}}, "CreateLogGroup") + mock_client.create_log_stream.return_value = {} + + with self.assertRaises(ClientError): + AwsCloudWatchEmfExporter(namespace="TestNamespace", log_group_name="test-log-group") + + @patch("botocore.session.Session") + def test_ensure_log_group_exists_success(self, mock_session): + """Test log group existence check when log group already exists.""" + # Mock the botocore session + mock_client = Mock() + mock_session_instance = Mock() + mock_session.return_value = mock_session_instance + mock_session_instance.create_client.return_value = mock_client + + # Make create fail with ResourceAlreadyExistsException (log group exists) + mock_client.create_log_group.side_effect = ClientError( + {"Error": {"Code": "ResourceAlreadyExistsException"}}, "CreateLogGroup" + ) + mock_client.create_log_stream.return_value = {} + + # This should not raise an exception + exporter = AwsCloudWatchEmfExporter(namespace="TestNamespace", log_group_name="test-log-group") + self.assertIsNotNone(exporter) + # Verify create was called once + mock_client.create_log_group.assert_called_once_with(logGroupName="test-log-group") + + def test_export_with_unsupported_metric_type(self): + """Test export with unsupported metric types.""" + # Create mock metrics data with unsupported metric type + resource = Resource.create({"service.name": "test-service"}) + + # Create non-gauge data + unsupported_data = Mock() + unsupported_data.data_points = [MockDataPoint(value=42.0)] + + metric = MockMetricWithData(name="test_counter", data=unsupported_data) + + scope_metrics = MockScopeMetrics(metrics=[metric]) + resource_metrics = MockResourceMetrics(resource=resource, scope_metrics=[scope_metrics]) + + metrics_data = Mock() + metrics_data.resource_metrics = [resource_metrics] + + # Should still return success even with unsupported metrics + result = self.exporter.export(metrics_data) + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_export_with_metric_without_data(self): + """Test export with metrics that don't have data attribute.""" + # Create mock metrics data + resource = Resource.create({"service.name": "test-service"}) + + # Create metric without data attribute + metric = Mock(spec=[]) + + scope_metrics = MockScopeMetrics(metrics=[metric]) + resource_metrics = MockResourceMetrics(resource=resource, scope_metrics=[scope_metrics]) + + metrics_data = Mock() + metrics_data.resource_metrics = [resource_metrics] + + # Should still return success + result = self.exporter.export(metrics_data) + self.assertEqual(result, MetricExportResult.SUCCESS) + + +if __name__ == "__main__": + unittest.main()