diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py new file mode 100644 index 000000000..72236121f --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py @@ -0,0 +1,381 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=no-self-use + +import logging +import time +import uuid +from typing import Any, Dict, List, Optional + +import botocore.session +from botocore.exceptions import ClientError + +logger = logging.getLogger(__name__) + + +class LogEventBatch: + """ + Container for a batch of CloudWatch log events with metadata. + + Tracks the log events, total byte size, and timestamps + for efficient batching and validation. + """ + + def __init__(self): + """Initialize an empty log event batch.""" + self.log_events: List[Dict[str, Any]] = [] + self.byte_total: int = 0 + self.min_timestamp_ms: int = 0 + self.max_timestamp_ms: int = 0 + self.created_timestamp_ms: int = int(time.time() * 1000) + + def add_event(self, log_event: Dict[str, Any], event_size: int) -> None: + """ + Add a log event to the batch. + + Args: + log_event: The log event to add + event_size: The byte size of the event + """ + self.log_events.append(log_event) + self.byte_total += event_size + + # Update timestamp tracking + timestamp = log_event.get("timestamp", 0) + if self.min_timestamp_ms == 0 or timestamp < self.min_timestamp_ms: + self.min_timestamp_ms = timestamp + if timestamp > self.max_timestamp_ms: + self.max_timestamp_ms = timestamp + + def is_empty(self) -> bool: + """Check if the batch is empty.""" + return len(self.log_events) == 0 + + def size(self) -> int: + """Get the number of events in the batch.""" + return len(self.log_events) + + def clear(self) -> None: + """Clear the batch.""" + self.log_events.clear() + self.byte_total = 0 + self.min_timestamp_ms = 0 + self.max_timestamp_ms = 0 + self.created_timestamp_ms = int(time.time() * 1000) + + +class CloudWatchLogClient: + """ + CloudWatch Logs client for batching and sending log events. + + This class handles the batching logic and CloudWatch Logs API interactions + for sending EMF logs efficiently while respecting CloudWatch Logs constraints. + """ + + # Constants for CloudWatch Logs limits + # http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html + # http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html + CW_MAX_EVENT_PAYLOAD_BYTES = 256 * 1024 # 256KB + CW_MAX_REQUEST_EVENT_COUNT = 10000 + CW_PER_EVENT_HEADER_BYTES = 26 + BATCH_FLUSH_INTERVAL = 60 * 1000 + CW_MAX_REQUEST_PAYLOAD_BYTES = 1 * 1024 * 1024 # 1MB + CW_TRUNCATED_SUFFIX = "[Truncated...]" + # None of the log events in the batch can be older than 14 days + CW_EVENT_TIMESTAMP_LIMIT_PAST = 14 * 24 * 60 * 60 * 1000 + # None of the log events in the batch can be more than 2 hours in the future. + CW_EVENT_TIMESTAMP_LIMIT_FUTURE = 2 * 60 * 60 * 1000 + + def __init__( + self, + log_group_name: str, + log_stream_name: Optional[str] = None, + aws_region: Optional[str] = None, + **kwargs, + ): + """ + Initialize the CloudWatch Logs client. + + Args: + log_group_name: CloudWatch log group name + log_stream_name: CloudWatch log stream name (auto-generated if None) + aws_region: AWS region (auto-detected if None) + **kwargs: Additional arguments passed to botocore client + """ + self.log_group_name = log_group_name + self.log_stream_name = log_stream_name or self._generate_log_stream_name() + + session = botocore.session.Session() + self.logs_client = session.create_client("logs", region_name=aws_region, **kwargs) + + # Event batch to store logs before sending to CloudWatch + self._event_batch = None + + def _generate_log_stream_name(self) -> str: + """Generate a unique log stream name.""" + unique_id = str(uuid.uuid4())[:8] + return f"otel-python-{unique_id}" + + def _create_log_group_if_needed(self): + """Create log group if it doesn't exist.""" + try: + self.logs_client.create_log_group(logGroupName=self.log_group_name) + logger.info("Created log group: %s", self.log_group_name) + except ClientError as error: + if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException": + logger.debug("Log group %s already exists", self.log_group_name) + else: + logger.error("Failed to create log group %s : %s", self.log_group_name, error) + raise + + def _create_log_stream_if_needed(self): + """Create log stream if it doesn't exist.""" + try: + self.logs_client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name) + logger.info("Created log stream: %s", self.log_stream_name) + except ClientError as error: + if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException": + logger.debug("Log stream %s already exists", self.log_stream_name) + else: + logger.error("Failed to create log stream %s : %s", self.log_stream_name, error) + raise + + def _validate_log_event(self, log_event: Dict) -> bool: + """ + Validate the log event according to CloudWatch Logs constraints. + Implements the same validation logic as the Go version. + + Args: + log_event: The log event to validate + + Returns: + bool: True if valid, False otherwise + """ + + # Check empty message + if not log_event.get("message") or not log_event.get("message").strip(): + logger.error("Empty log event message") + return False + + message = log_event.get("message", "") + timestamp = log_event.get("timestamp", 0) + + # Check message size + message_size = len(message) + self.CW_PER_EVENT_HEADER_BYTES + if message_size > self.CW_MAX_EVENT_PAYLOAD_BYTES: + logger.warning( + "Log event size %s exceeds maximum allowed size %s. Truncating.", + message_size, + self.CW_MAX_EVENT_PAYLOAD_BYTES, + ) + max_message_size = ( + self.CW_MAX_EVENT_PAYLOAD_BYTES - self.CW_PER_EVENT_HEADER_BYTES - len(self.CW_TRUNCATED_SUFFIX) + ) + log_event["message"] = message[:max_message_size] + self.CW_TRUNCATED_SUFFIX + + # Check timestamp constraints + current_time = int(time.time() * 1000) # Current time in milliseconds + event_time = timestamp + + # Calculate the time difference + time_diff = current_time - event_time + + # Check if too old or too far in the future + if time_diff > self.CW_EVENT_TIMESTAMP_LIMIT_PAST or time_diff < -self.CW_EVENT_TIMESTAMP_LIMIT_FUTURE: + logger.error( + "Log event timestamp %s is either older than 14 days or more than 2 hours in the future. " + "Current time: %s", + event_time, + current_time, + ) + return False + + return True + + def _create_event_batch(self) -> LogEventBatch: + """ + Create a new log event batch. + + Returns: + LogEventBatch: A new event batch + """ + return LogEventBatch() + + def _event_batch_exceeds_limit(self, batch: LogEventBatch, next_event_size: int) -> bool: + """ + Check if adding the next event would exceed CloudWatch Logs limits. + + Args: + batch: The current batch + next_event_size: Size of the next event in bytes + + Returns: + bool: True if adding the next event would exceed limits + """ + return ( + batch.size() >= self.CW_MAX_REQUEST_EVENT_COUNT + or batch.byte_total + next_event_size > self.CW_MAX_REQUEST_PAYLOAD_BYTES + ) + + def _is_batch_active(self, batch: LogEventBatch, target_timestamp_ms: int) -> bool: + """ + Check if the event batch spans more than 24 hours. + + Args: + batch: The event batch + target_timestamp_ms: The timestamp of the event to add + + Returns: + bool: True if the batch is active and can accept the event + """ + # New log event batch + if batch.min_timestamp_ms == 0 or batch.max_timestamp_ms == 0: + return True + + # Check if adding the event would make the batch span more than 24 hours + if target_timestamp_ms - batch.min_timestamp_ms > 24 * 3600 * 1000: + return False + + if batch.max_timestamp_ms - target_timestamp_ms > 24 * 3600 * 1000: + return False + + # flush the event batch when reached 60s interval + current_time = int(time.time() * 1000) + if current_time - batch.created_timestamp_ms >= self.BATCH_FLUSH_INTERVAL: + return False + + return True + + def _sort_log_events(self, batch: LogEventBatch) -> None: + """ + Sort log events in the batch by timestamp. + + Args: + batch: The event batch + """ + batch.log_events = sorted(batch.log_events, key=lambda x: x["timestamp"]) + + def _send_log_batch(self, batch: LogEventBatch) -> None: + """ + Send a batch of log events to CloudWatch Logs. + Creates log group and stream lazily if they don't exist. + + Args: + batch: The event batch + """ + if batch.is_empty(): + return None + + # Sort log events by timestamp + self._sort_log_events(batch) + + # Prepare the PutLogEvents request + put_log_events_input = { + "logGroupName": self.log_group_name, + "logStreamName": self.log_stream_name, + "logEvents": batch.log_events, + } + + start_time = time.time() + + try: + # Make the PutLogEvents call + response = self.logs_client.put_log_events(**put_log_events_input) + + elapsed_ms = int((time.time() - start_time) * 1000) + logger.debug( + "Successfully sent %s log events (%s KB) in %s ms", + batch.size(), + batch.byte_total / 1024, + elapsed_ms, + ) + + return response + + except ClientError as error: + # Handle resource not found errors by creating log group/stream + error_code = error.response.get("Error", {}).get("Code") + if error_code == "ResourceNotFoundException": + logger.info("Log group or stream not found, creating resources and retrying") + + try: + # Create log group first + self._create_log_group_if_needed() + # Then create log stream + self._create_log_stream_if_needed() + + # Retry the PutLogEvents call + response = self.logs_client.put_log_events(**put_log_events_input) + + elapsed_ms = int((time.time() - start_time) * 1000) + logger.debug( + "Successfully sent %s log events (%s KB) in %s ms after creating resources", + batch.size(), + batch.byte_total / 1024, + elapsed_ms, + ) + + return response + + except ClientError as retry_error: + logger.error("Failed to send log events after creating resources: %s", retry_error) + raise + else: + logger.error("Failed to send log events: %s", error) + raise + + def send_log_event(self, log_event: Dict[str, Any]): + """ + Send a log event to CloudWatch Logs. + + This function implements the same logic as the Go version in the OTel Collector. + It batches log events according to CloudWatch Logs constraints and sends them + when the batch is full or spans more than 24 hours. + + Args: + log_event: The log event to send + """ + try: + # Validate the log event + if not self._validate_log_event(log_event): + return + + # Calculate event size + event_size = len(log_event["message"]) + self.CW_PER_EVENT_HEADER_BYTES + + # Initialize event batch if needed + if self._event_batch is None: + self._event_batch = self._create_event_batch() + + # Check if we need to send the current batch and create a new one + current_batch = self._event_batch + if self._event_batch_exceeds_limit(current_batch, event_size) or not self._is_batch_active( + current_batch, log_event["timestamp"] + ): + # Send the current batch + self._send_log_batch(current_batch) + # Create a new batch + self._event_batch = self._create_event_batch() + current_batch = self._event_batch + + # Add the log event to the batch + current_batch.add_event(log_event, event_size) + + except Exception as error: + logger.error("Failed to process log event: %s", error) + raise + + def flush_pending_events(self) -> bool: + """ + Flush any pending log events. + + Returns: + True if successful, False otherwise + """ + if self._event_batch is not None and not self._event_batch.is_empty(): + current_batch = self._event_batch + self._send_log_batch(current_batch) + self._event_batch = self._create_event_batch() + logger.debug("CloudWatchLogClient flushed the buffered log events") + return True diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py index e2e364b03..643897da0 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py @@ -5,34 +5,32 @@ import json import logging +import math import time -import uuid from collections import defaultdict from typing import Any, Dict, List, Optional, Tuple -import botocore.session -from botocore.exceptions import ClientError - -from opentelemetry.sdk.metrics import ( - Counter, - Histogram, - ObservableCounter, - ObservableGauge, - ObservableUpDownCounter, - UpDownCounter, -) +from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics import Histogram as HistogramInstr +from opentelemetry.sdk.metrics import ObservableCounter, ObservableGauge, ObservableUpDownCounter, UpDownCounter from opentelemetry.sdk.metrics._internal.point import Metric from opentelemetry.sdk.metrics.export import ( AggregationTemporality, + ExponentialHistogram, Gauge, + Histogram, MetricExporter, MetricExportResult, MetricsData, NumberDataPoint, + Sum, ) +from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation from opentelemetry.sdk.resources import Resource from opentelemetry.util.types import Attributes +from ._cloudwatch_log_client import CloudWatchLogClient + logger = logging.getLogger(__name__) @@ -127,6 +125,7 @@ def __init__( log_stream_name: Optional[str] = None, aws_region: Optional[str] = None, preferred_temporality: Optional[Dict[type, AggregationTemporality]] = None, + preferred_aggregation: Optional[Dict[type, Any]] = None, **kwargs, ): """ @@ -138,64 +137,35 @@ def __init__( log_stream_name: CloudWatch log stream name (auto-generated if None) aws_region: AWS region (auto-detected if None) preferred_temporality: Optional dictionary mapping instrument types to aggregation temporality + preferred_aggregation: Optional dictionary mapping instrument types to preferred aggregation **kwargs: Additional arguments passed to botocore client """ # Set up temporality preference default to DELTA if customers not set if preferred_temporality is None: preferred_temporality = { Counter: AggregationTemporality.DELTA, - Histogram: AggregationTemporality.DELTA, + HistogramInstr: AggregationTemporality.DELTA, ObservableCounter: AggregationTemporality.DELTA, ObservableGauge: AggregationTemporality.DELTA, ObservableUpDownCounter: AggregationTemporality.DELTA, UpDownCounter: AggregationTemporality.DELTA, } - super().__init__(preferred_temporality) + # Set up aggregation preference default to exponential histogram for histogram metrics + if preferred_aggregation is None: + preferred_aggregation = { + HistogramInstr: ExponentialBucketHistogramAggregation(), + } + + super().__init__(preferred_temporality, preferred_aggregation) self.namespace = namespace self.log_group_name = log_group_name - self.log_stream_name = log_stream_name or self._generate_log_stream_name() - session = botocore.session.Session() - self.logs_client = session.create_client("logs", region_name=aws_region, **kwargs) - - # Ensure log group exists - self._ensure_log_group_exists() - - # Ensure log stream exists - self._ensure_log_stream_exists() - - # Default to unique log stream name matching OTel Collector - # EMF Exporter behavior with language for source identification - def _generate_log_stream_name(self) -> str: - """Generate a unique log stream name.""" - - unique_id = str(uuid.uuid4())[:8] - return f"otel-python-{unique_id}" - - def _ensure_log_group_exists(self): - """Ensure the log group exists, create if it doesn't.""" - try: - self.logs_client.create_log_group(logGroupName=self.log_group_name) - logger.info("Created log group: %s", self.log_group_name) - except ClientError as error: - if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException": - logger.debug("Log group %s already exists", self.log_group_name) - else: - logger.error("Failed to create log group %s : %s", self.log_group_name, error) - raise - - def _ensure_log_stream_exists(self): - try: - self.logs_client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name) - logger.info("Created log stream: %s", self.log_stream_name) - except ClientError as error: - if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException": - logger.debug("Log stream %s already exists", self.log_stream_name) - else: - logger.error("Failed to create log stream %s : %s", self.log_group_name, error) - raise + # Initialize CloudWatch Logs client + self.log_client = CloudWatchLogClient( + log_group_name=log_group_name, log_stream_name=log_stream_name, aws_region=aws_region, **kwargs + ) def _get_metric_name(self, record: MetricRecord) -> Optional[str]: """Get the metric name from the metric record or data point.""" @@ -275,8 +245,8 @@ def _create_metric_record(self, metric_name: str, metric_unit: str, metric_descr """ return MetricRecord(metric_name, metric_unit, metric_description) - def _convert_gauge(self, metric: Metric, data_point: NumberDataPoint) -> MetricRecord: - """Convert a Gauge metric datapoint to a metric record. + def _convert_gauge_and_sum(self, metric: Metric, data_point: NumberDataPoint) -> MetricRecord: + """Convert a Gauge or Sum metric datapoint to a metric record. Args: metric: The metric object @@ -289,30 +259,165 @@ def _convert_gauge(self, metric: Metric, data_point: NumberDataPoint) -> MetricR record = self._create_metric_record(metric.name, metric.unit, metric.description) # Set timestamp - try: - timestamp_ms = ( - self._normalize_timestamp(data_point.time_unix_nano) - if data_point.time_unix_nano is not None - else int(time.time() * 1000) - ) - except AttributeError: - # data_point doesn't have time_unix_nano attribute - timestamp_ms = int(time.time() * 1000) + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) record.timestamp = timestamp_ms # Set attributes - try: - record.attributes = data_point.attributes - except AttributeError: - # data_point doesn't have attributes - record.attributes = {} + record.attributes = data_point.attributes - # For Gauge, set the value directly - try: - record.value = data_point.value - except AttributeError: - # data_point doesn't have value - record.value = None + # Set the value directly for both Gauge and Sum + record.value = data_point.value + + return record + + def _convert_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: + """Convert a Histogram metric datapoint to a metric record. + + https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/datapoint.go#L87 + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and histogram_data + """ + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + record.timestamp = timestamp_ms + + # Set attributes + record.attributes = data_point.attributes + + # For Histogram, set the histogram_data + record.histogram_data = { + "Count": data_point.count, + "Sum": data_point.sum, + "Min": data_point.min, + "Max": data_point.max, + } + return record + + # pylint: disable=too-many-locals + def _convert_exp_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: + """ + Convert an ExponentialHistogram metric datapoint to a metric record. + + This function follows the logic of CalculateDeltaDatapoints in the Go implementation, + converting exponential buckets to their midpoint values. + + Ref: + https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/22626 + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and exp_histogram_data + """ + + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + record.timestamp = timestamp_ms + + # Set attributes + record.attributes = data_point.attributes + + # Initialize arrays for values and counts + array_values = [] + array_counts = [] + + # Get scale + scale = data_point.scale + # Calculate base using the formula: 2^(2^(-scale)) + base = math.pow(2, math.pow(2, float(-scale))) + + # Process positive buckets + if data_point.positive and data_point.positive.bucket_counts: + positive_offset = getattr(data_point.positive, "offset", 0) + positive_bucket_counts = data_point.positive.bucket_counts + + bucket_begin = 0 + bucket_end = 0 + + for bucket_index, count in enumerate(positive_bucket_counts): + index = bucket_index + positive_offset + + if bucket_begin == 0: + bucket_begin = math.pow(base, float(index)) + else: + bucket_begin = bucket_end + + bucket_end = math.pow(base, float(index + 1)) + + # Calculate midpoint value of the bucket + metric_val = (bucket_begin + bucket_end) / 2 + + # Only include buckets with positive counts + if count > 0: + array_values.append(metric_val) + array_counts.append(float(count)) + + # Process zero bucket + zero_count = getattr(data_point, "zero_count", 0) + if zero_count > 0: + array_values.append(0) + array_counts.append(float(zero_count)) + + # Process negative buckets + if data_point.negative and data_point.negative.bucket_counts: + negative_offset = getattr(data_point.negative, "offset", 0) + negative_bucket_counts = data_point.negative.bucket_counts + + bucket_begin = 0 + bucket_end = 0 + + for bucket_index, count in enumerate(negative_bucket_counts): + index = bucket_index + negative_offset + + if bucket_end == 0: + bucket_end = -math.pow(base, float(index)) + else: + bucket_end = bucket_begin + + bucket_begin = -math.pow(base, float(index + 1)) + + # Calculate midpoint value of the bucket + metric_val = (bucket_begin + bucket_end) / 2 + + # Only include buckets with positive counts + if count > 0: + array_values.append(metric_val) + array_counts.append(float(count)) + + # Set the histogram data in the format expected by CloudWatch EMF + record.exp_histogram_data = { + "Values": array_values, + "Counts": array_counts, + "Count": data_point.count, + "Sum": data_point.sum, + "Max": data_point.max, + "Min": data_point.min, + } return record @@ -358,7 +463,11 @@ def _create_emf_log( metric_definitions = [] # Collect attributes from all records (they should be the same for all records in the group) # Only collect once from the first record and apply to all records - all_attributes = metric_records[0].attributes if metric_records and metric_records[0].attributes else {} + all_attributes = ( + metric_records[0].attributes + if metric_records and len(metric_records) > 0 and metric_records[0].attributes + else {} + ) # Process each metric record for record in metric_records: @@ -369,11 +478,6 @@ def _create_emf_log( if not metric_name: continue - # Skip processing if metric value is None or empty - if record.value is None: - logger.debug("Skipping metric %s as it does not have valid metric value", metric_name) - continue - # Create metric data dict metric_data = {"Name": metric_name} @@ -381,11 +485,23 @@ def _create_emf_log( if unit: metric_data["Unit"] = unit + # Process different types of aggregations + if record.exp_histogram_data: + # Base2 Exponential Histogram + emf_log[metric_name] = record.exp_histogram_data + elif record.histogram_data: + # Regular Histogram metrics + emf_log[metric_name] = record.histogram_data + elif record.value is not None: + # Gauge, Sum, and other aggregations + emf_log[metric_name] = record.value + else: + logger.debug("Skipping metric %s as it does not have valid metric value", metric_name) + continue + # Add to metric definitions list metric_definitions.append(metric_data) - emf_log[metric_name] = record.value - # Get dimension names from collected attributes dimension_names = self._get_dimension_names(all_attributes) @@ -401,31 +517,18 @@ def _create_emf_log( return emf_log - # pylint: disable=no-member def _send_log_event(self, log_event: Dict[str, Any]): """ - Send a log event to CloudWatch Logs. + Send a log event to CloudWatch Logs using the log client. - Basic implementation for PR 1 - sends individual events directly. - - TODO: Batching event and follow CloudWatch Logs quato constraints - number of events & size limit per payload + Args: + log_event: The log event to send """ - try: - # Send the log event - response = self.logs_client.put_log_events( - logGroupName=self.log_group_name, logStreamName=self.log_stream_name, logEvents=[log_event] - ) - - logger.debug("Successfully sent log event") - return response - - except ClientError as error: - logger.debug("Failed to send log event: %s", error) - raise + self.log_client.send_log_event(log_event) - # pylint: disable=too-many-nested-blocks + # pylint: disable=too-many-nested-blocks,unused-argument,too-many-branches def export( - self, metrics_data: MetricsData, timeout_millis: Optional[int] = None, **kwargs: Any + self, metrics_data: MetricsData, timeout_millis: Optional[int] = None, **_kwargs: Any ) -> MetricExportResult: """ Export metrics as EMF logs to CloudWatch. @@ -462,9 +565,17 @@ def export( # Process metrics based on type metric_type = type(metric.data) - if metric_type == Gauge: + if metric_type in (Gauge, Sum): for dp in metric.data.data_points: - record = self._convert_gauge(metric, dp) + record = self._convert_gauge_and_sum(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + elif metric_type == Histogram: + for dp in metric.data.data_points: + record = self._convert_histogram(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + elif metric_type == ExponentialHistogram: + for dp in metric.data.data_points: + record = self._convert_exp_histogram(metric, dp) grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) else: logger.debug("Unsupported Metric Type: %s", metric_type) @@ -491,33 +602,30 @@ def export( logger.error("Failed to export metrics: %s", error) return MetricExportResult.FAILURE - def force_flush(self, timeout_millis: int = 10000) -> bool: + def force_flush(self, timeout_millis: int = 10000) -> bool: # pylint: disable=unused-argument """ Force flush any pending metrics. - TODO: will add logic to handle gracefule shutdown - Args: timeout_millis: Timeout in milliseconds Returns: True if successful, False otherwise """ + self.log_client.flush_pending_events() logger.debug("AwsCloudWatchEmfExporter force flushes the buffered metrics") return True - def shutdown(self, timeout_millis: Optional[int] = None, **kwargs: Any) -> bool: + def shutdown(self, timeout_millis: Optional[int] = None, **_kwargs: Any) -> bool: """ Shutdown the exporter. Override to handle timeout and other keyword arguments, but do nothing. - TODO: will add logic to handle gracefule shutdown - Args: timeout_millis: Ignored timeout in milliseconds **kwargs: Ignored additional keyword arguments """ - # Intentionally do nothing + # Force flush any remaining batched events self.force_flush(timeout_millis) logger.debug("AwsCloudWatchEmfExporter shutdown called with timeout_millis=%s", timeout_millis) return True diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/metrics/test_aws_cloudwatch_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py similarity index 74% rename from aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/metrics/test_aws_cloudwatch_emf_exporter.py rename to aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py index 01d500c70..7ac67fddf 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/metrics/test_aws_cloudwatch_emf_exporter.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py @@ -6,10 +6,8 @@ import unittest from unittest.mock import Mock, patch -from botocore.exceptions import ClientError - from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import AwsCloudWatchEmfExporter -from opentelemetry.sdk.metrics.export import Gauge, MetricExportResult +from opentelemetry.sdk.metrics.export import Gauge, Histogram, MetricExportResult, Sum from opentelemetry.sdk.resources import Resource @@ -31,6 +29,42 @@ def __init__(self, name="test_metric", unit="1", description="Test metric"): self.description = description +class MockHistogramDataPoint(MockDataPoint): + """Mock histogram datapoint for testing.""" + + def __init__(self, count=5, sum_val=25.0, min_val=1.0, max_val=10.0, **kwargs): + super().__init__(**kwargs) + self.count = count + self.sum = sum_val + self.min = min_val + self.max = max_val + + +class MockExpHistogramDataPoint(MockDataPoint): + """Mock exponential histogram datapoint for testing.""" + + def __init__(self, count=10, sum_val=50.0, min_val=1.0, max_val=20.0, scale=2, **kwargs): + super().__init__(**kwargs) + self.count = count + self.sum = sum_val + self.min = min_val + self.max = max_val + self.scale = scale + + # Mock positive buckets + self.positive = Mock() + self.positive.offset = 0 + self.positive.bucket_counts = [1, 2, 3, 4] + + # Mock negative buckets + self.negative = Mock() + self.negative.offset = 0 + self.negative.bucket_counts = [] + + # Mock zero count + self.zero_count = 0 + + class MockGaugeData: """Mock gauge data that passes isinstance checks.""" @@ -38,6 +72,27 @@ def __init__(self, data_points=None): self.data_points = data_points or [] +class MockSumData: + """Mock sum data that passes isinstance checks.""" + + def __init__(self, data_points=None): + self.data_points = data_points or [] + + +class MockHistogramData: + """Mock histogram data that passes isinstance checks.""" + + def __init__(self, data_points=None): + self.data_points = data_points or [] + + +class MockExpHistogramData: + """Mock exponential histogram data that passes isinstance checks.""" + + def __init__(self, data_points=None): + self.data_points = data_points or [] + + class MockMetricWithData: """Mock metric with data attribute.""" @@ -76,16 +131,14 @@ def setUp(self): mock_session_instance = Mock() mock_session.return_value = mock_session_instance mock_session_instance.create_client.return_value = mock_client - mock_client.create_log_group.return_value = {} - mock_client.create_log_stream.return_value = {} self.exporter = AwsCloudWatchEmfExporter(namespace="TestNamespace", log_group_name="test-log-group") def test_initialization(self): """Test exporter initialization.""" self.assertEqual(self.exporter.namespace, "TestNamespace") - self.assertIsNotNone(self.exporter.log_stream_name) self.assertEqual(self.exporter.log_group_name, "test-log-group") + self.assertIsNotNone(self.exporter.log_client) @patch("botocore.session.Session") def test_initialization_with_custom_params(self, mock_session): @@ -95,8 +148,6 @@ def test_initialization_with_custom_params(self, mock_session): mock_session_instance = Mock() mock_session.return_value = mock_session_instance mock_session_instance.create_client.return_value = mock_client - mock_client.create_log_group.return_value = {} - mock_client.create_log_stream.return_value = {} exporter = AwsCloudWatchEmfExporter( namespace="CustomNamespace", @@ -106,7 +157,6 @@ def test_initialization_with_custom_params(self, mock_session): ) self.assertEqual(exporter.namespace, "CustomNamespace") self.assertEqual(exporter.log_group_name, "custom-log-group") - self.assertEqual(exporter.log_stream_name, "custom-stream") def test_get_unit_mapping(self): """Test unit mapping functionality.""" @@ -205,16 +255,6 @@ def test_group_by_attributes_and_timestamp(self): self.assertEqual(len(result), 2) self.assertEqual(result[1], 1234567890) - def test_generate_log_stream_name(self): - """Test log stream name generation.""" - name1 = self.exporter._generate_log_stream_name() - name2 = self.exporter._generate_log_stream_name() - - # Should generate unique names - self.assertNotEqual(name1, name2) - self.assertTrue(name1.startswith("otel-python-")) - self.assertTrue(name2.startswith("otel-python-")) - def test_normalize_timestamp(self): """Test timestamp normalization.""" timestamp_ns = 1609459200000000000 # 2021-01-01 00:00:00 in nanoseconds @@ -237,7 +277,7 @@ def test_convert_gauge(self): metric = MockMetric("gauge_metric", "Count", "Gauge description") dp = MockDataPoint(value=42.5, attributes={"key": "value"}) - record = self.exporter._convert_gauge(metric, dp) + record = self.exporter._convert_gauge_and_sum(metric, dp) self.assertIsNotNone(record) self.assertEqual(record.name, "gauge_metric") @@ -245,6 +285,58 @@ def test_convert_gauge(self): self.assertEqual(record.attributes, {"key": "value"}) self.assertIsInstance(record.timestamp, int) + def test_convert_sum(self): + """Test sum conversion.""" + metric = MockMetric("sum_metric", "Count", "Sum description") + dp = MockDataPoint(value=100.0, attributes={"env": "test"}) + + record = self.exporter._convert_gauge_and_sum(metric, dp) + + self.assertIsNotNone(record) + self.assertEqual(record.name, "sum_metric") + self.assertEqual(record.value, 100.0) + self.assertEqual(record.attributes, {"env": "test"}) + self.assertIsInstance(record.timestamp, int) + + def test_convert_histogram(self): + """Test histogram conversion.""" + metric = MockMetric("histogram_metric", "ms", "Histogram description") + dp = MockHistogramDataPoint( + count=10, sum_val=150.0, min_val=5.0, max_val=25.0, attributes={"region": "us-east-1"} + ) + + record = self.exporter._convert_histogram(metric, dp) + + self.assertIsNotNone(record) + self.assertEqual(record.name, "histogram_metric") + self.assertTrue(hasattr(record, "histogram_data")) + + expected_value = {"Count": 10, "Sum": 150.0, "Min": 5.0, "Max": 25.0} + self.assertEqual(record.histogram_data, expected_value) + self.assertEqual(record.attributes, {"region": "us-east-1"}) + self.assertIsInstance(record.timestamp, int) + + def test_convert_exp_histogram(self): + """Test exponential histogram conversion.""" + metric = MockMetric("exp_histogram_metric", "s", "Exponential histogram description") + dp = MockExpHistogramDataPoint(count=8, sum_val=64.0, min_val=2.0, max_val=32.0, attributes={"service": "api"}) + + record = self.exporter._convert_exp_histogram(metric, dp) + + self.assertIsNotNone(record) + self.assertEqual(record.name, "exp_histogram_metric") + self.assertTrue(hasattr(record, "exp_histogram_data")) + + exp_data = record.exp_histogram_data + self.assertIn("Values", exp_data) + self.assertIn("Counts", exp_data) + self.assertEqual(exp_data["Count"], 8) + self.assertEqual(exp_data["Sum"], 64.0) + self.assertEqual(exp_data["Min"], 2.0) + self.assertEqual(exp_data["Max"], 32.0) + self.assertEqual(record.attributes, {"service": "api"}) + self.assertIsInstance(record.timestamp, int) + def test_create_emf_log(self): """Test EMF log creation.""" # Create test records @@ -263,71 +355,6 @@ def test_create_emf_log(self): # Check that the result is JSON serializable json.dumps(result) # Should not raise exception - @patch("botocore.session.Session") - def test_export_success(self, mock_session): - """Test successful export.""" - # Mock CloudWatch Logs client - mock_client = Mock() - mock_session_instance = Mock() - mock_session.return_value = mock_session_instance - mock_session_instance.create_client.return_value = mock_client - mock_client.put_log_events.return_value = {"nextSequenceToken": "12345"} - - # Create empty metrics data to test basic export flow - metrics_data = Mock() - metrics_data.resource_metrics = [] - - result = self.exporter.export(metrics_data) - - self.assertEqual(result, MetricExportResult.SUCCESS) - - def test_export_failure(self): - """Test export failure handling.""" - # Create metrics data that will cause an exception during iteration - metrics_data = Mock() - # Make resource_metrics raise an exception when iterated over - metrics_data.resource_metrics = Mock() - metrics_data.resource_metrics.__iter__ = Mock(side_effect=Exception("Test exception")) - - result = self.exporter.export(metrics_data) - - self.assertEqual(result, MetricExportResult.FAILURE) - - def test_force_flush_no_pending_events(self): - """Test force flush functionality with no pending events.""" - result = self.exporter.force_flush() - - self.assertTrue(result) - - @patch.object(AwsCloudWatchEmfExporter, "force_flush") - def test_shutdown(self, mock_force_flush): - """Test shutdown functionality.""" - mock_force_flush.return_value = True - - result = self.exporter.shutdown(timeout_millis=5000) - - self.assertTrue(result) - mock_force_flush.assert_called_once_with(5000) - - def test_send_log_event_method_exists(self): - """Test that _send_log_event method exists and can be called.""" - # Just test that the method exists and doesn't crash with basic input - log_event = {"message": "test message", "timestamp": 1234567890} - - # Mock the AWS client methods to avoid actual AWS calls - with patch.object(self.exporter.logs_client, "create_log_group"): - with patch.object(self.exporter.logs_client, "create_log_stream"): - with patch.object(self.exporter.logs_client, "put_log_events") as mock_put: - mock_put.return_value = {"nextSequenceToken": "12345"} - - # Should not raise an exception - try: - response = self.exporter._send_log_event(log_event) - # Response may be None or a dict, both are acceptable - self.assertTrue(response is None or isinstance(response, dict)) - except ClientError as error: - self.fail(f"_send_log_event raised an exception: {error}") - def test_create_emf_log_with_resource(self): """Test EMF log creation with resource attributes.""" # Create test records @@ -364,58 +391,6 @@ def test_create_emf_log_with_resource(self): self.assertEqual(set(cw_metrics["Dimensions"][0]), {"env", "service"}) self.assertEqual(cw_metrics["Metrics"][0]["Name"], "gauge_metric") - @patch("botocore.session.Session") - def test_export_with_gauge_metrics(self, mock_session): - """Test exporting actual gauge metrics.""" - # Mock CloudWatch Logs client - mock_client = Mock() - mock_session_instance = Mock() - mock_session.return_value = mock_session_instance - mock_session_instance.create_client.return_value = mock_client - mock_client.put_log_events.return_value = {"nextSequenceToken": "12345"} - mock_client.create_log_group.side_effect = ClientError( - {"Error": {"Code": "ResourceAlreadyExistsException"}}, "CreateLogGroup" - ) - mock_client.create_log_stream.side_effect = ClientError( - {"Error": {"Code": "ResourceAlreadyExistsException"}}, "CreateLogStream" - ) - - # Create mock metrics data - resource = Resource.create({"service.name": "test-service"}) - - # Create gauge data - gauge_data = Gauge(data_points=[MockDataPoint(value=42.0, attributes={"key": "value"})]) - - metric = MockMetricWithData(name="test_gauge", data=gauge_data) - - scope_metrics = MockScopeMetrics(metrics=[metric]) - resource_metrics = MockResourceMetrics(resource=resource, scope_metrics=[scope_metrics]) - - metrics_data = Mock() - metrics_data.resource_metrics = [resource_metrics] - - result = self.exporter.export(metrics_data) - - self.assertEqual(result, MetricExportResult.SUCCESS) - # Test validates that export works with gauge metrics - - def test_get_metric_name_fallback(self): - """Test metric name extraction fallback.""" - # Test with record that has no instrument attribute - record = Mock(spec=[]) - - result = self.exporter._get_metric_name(record) - self.assertIsNone(result) - - def test_get_metric_name_empty_name(self): - """Test metric name extraction with empty name.""" - # Test with record that has empty name - record = Mock() - record.name = "" - - result = self.exporter._get_metric_name(record) - self.assertIsNone(result) - def test_create_emf_log_skips_empty_metric_names(self): """Test that EMF log creation skips records with empty metric names.""" # Create a record with no metric name @@ -445,63 +420,94 @@ def test_create_emf_log_skips_empty_metric_names(self): metric_names = [m["Name"] for m in cw_metrics["Metrics"]] self.assertIn("valid_metric", metric_names) - @patch("os.environ.get") @patch("botocore.session.Session") - def test_initialization_with_env_region(self, mock_session, mock_env_get): - """Test initialization with AWS region from environment.""" - # Mock environment variable - mock_env_get.side_effect = lambda key: "us-west-1" if key == "AWS_REGION" else None - - # Mock the botocore session to avoid AWS calls + def test_export_success(self, mock_session): + """Test successful export.""" + # Mock CloudWatch Logs client mock_client = Mock() mock_session_instance = Mock() mock_session.return_value = mock_session_instance mock_session_instance.create_client.return_value = mock_client - mock_client.create_log_group.return_value = {} - mock_client.create_log_stream.return_value = {} + mock_client.put_log_events.return_value = {"nextSequenceToken": "12345"} - exporter = AwsCloudWatchEmfExporter(namespace="TestNamespace", log_group_name="test-log-group") + # Create empty metrics data to test basic export flow + metrics_data = Mock() + metrics_data.resource_metrics = [] - # Just verify the exporter was created successfully with region handling - self.assertIsNotNone(exporter) - self.assertEqual(exporter.namespace, "TestNamespace") + result = self.exporter.export(metrics_data) - @patch("botocore.session.Session") - def test_ensure_log_group_exists_create_failure(self, mock_session): - """Test log group creation failure.""" - # Mock the botocore session - mock_client = Mock() - mock_session_instance = Mock() - mock_session.return_value = mock_session_instance - mock_session_instance.create_client.return_value = mock_client + self.assertEqual(result, MetricExportResult.SUCCESS) - # Make create fail with access denied error - mock_client.create_log_group.side_effect = ClientError({"Error": {"Code": "AccessDenied"}}, "CreateLogGroup") - mock_client.create_log_stream.return_value = {} + def test_export_failure(self): + """Test export failure handling.""" + # Create metrics data that will cause an exception during iteration + metrics_data = Mock() + # Make resource_metrics raise an exception when iterated over + metrics_data.resource_metrics = Mock() + metrics_data.resource_metrics.__iter__ = Mock(side_effect=Exception("Test exception")) - with self.assertRaises(ClientError): - AwsCloudWatchEmfExporter(namespace="TestNamespace", log_group_name="test-log-group") + result = self.exporter.export(metrics_data) - @patch("botocore.session.Session") - def test_ensure_log_group_exists_success(self, mock_session): - """Test log group existence check when log group already exists.""" - # Mock the botocore session - mock_client = Mock() - mock_session_instance = Mock() - mock_session.return_value = mock_session_instance - mock_session_instance.create_client.return_value = mock_client + self.assertEqual(result, MetricExportResult.FAILURE) - # Make create fail with ResourceAlreadyExistsException (log group exists) - mock_client.create_log_group.side_effect = ClientError( - {"Error": {"Code": "ResourceAlreadyExistsException"}}, "CreateLogGroup" - ) - mock_client.create_log_stream.return_value = {} + def test_export_with_gauge_metrics(self): + """Test exporting actual gauge metrics.""" + # Create mock metrics data + resource = Resource.create({"service.name": "test-service"}) - # This should not raise an exception - exporter = AwsCloudWatchEmfExporter(namespace="TestNamespace", log_group_name="test-log-group") - self.assertIsNotNone(exporter) - # Verify create was called once - mock_client.create_log_group.assert_called_once_with(logGroupName="test-log-group") + # Create gauge data + gauge_data = Gauge(data_points=[MockDataPoint(value=42.0, attributes={"key": "value"})]) + + metric = MockMetricWithData(name="test_gauge", data=gauge_data) + + scope_metrics = MockScopeMetrics(metrics=[metric]) + resource_metrics = MockResourceMetrics(resource=resource, scope_metrics=[scope_metrics]) + + metrics_data = Mock() + metrics_data.resource_metrics = [resource_metrics] + + result = self.exporter.export(metrics_data) + + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_export_with_sum_metrics(self): + """Test export with Sum metrics.""" + # Create mock metrics data with Sum type + resource = Resource.create({"service.name": "test-service"}) + + sum_data = MockSumData([MockDataPoint(value=25.0, attributes={"env": "test"})]) + # Create a mock that will pass the type() check for Sum + sum_data.__class__ = Sum + metric = MockMetricWithData(name="test_sum", data=sum_data) + + scope_metrics = MockScopeMetrics(metrics=[metric]) + resource_metrics = MockResourceMetrics(resource=resource, scope_metrics=[scope_metrics]) + + metrics_data = Mock() + metrics_data.resource_metrics = [resource_metrics] + + result = self.exporter.export(metrics_data) + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_export_with_histogram_metrics(self): + """Test export with Histogram metrics.""" + # Create mock metrics data with Histogram type + resource = Resource.create({"service.name": "test-service"}) + + hist_dp = MockHistogramDataPoint(count=5, sum_val=25.0, min_val=1.0, max_val=10.0, attributes={"env": "test"}) + hist_data = MockHistogramData([hist_dp]) + # Create a mock that will pass the type() check for Histogram + hist_data.__class__ = Histogram + metric = MockMetricWithData(name="test_histogram", data=hist_data) + + scope_metrics = MockScopeMetrics(metrics=[metric]) + resource_metrics = MockResourceMetrics(resource=resource, scope_metrics=[scope_metrics]) + + metrics_data = Mock() + metrics_data.resource_metrics = [resource_metrics] + + result = self.exporter.export(metrics_data) + self.assertEqual(result, MetricExportResult.SUCCESS) def test_export_with_unsupported_metric_type(self): """Test export with unsupported metric types.""" @@ -542,6 +548,73 @@ def test_export_with_metric_without_data(self): result = self.exporter.export(metrics_data) self.assertEqual(result, MetricExportResult.SUCCESS) + def test_get_metric_name_fallback(self): + """Test metric name extraction fallback.""" + # Test with record that has no instrument attribute + record = Mock(spec=[]) + + result = self.exporter._get_metric_name(record) + self.assertIsNone(result) + + def test_get_metric_name_empty_name(self): + """Test metric name extraction with empty name.""" + # Test with record that has empty name + record = Mock() + record.name = "" + + result = self.exporter._get_metric_name(record) + self.assertIsNone(result) + + @patch("os.environ.get") + @patch("botocore.session.Session") + def test_initialization_with_env_region(self, mock_session, mock_env_get): + """Test initialization with AWS region from environment.""" + # Mock environment variable + mock_env_get.side_effect = lambda key: "us-west-1" if key == "AWS_REGION" else None + + # Mock the botocore session to avoid AWS calls + mock_client = Mock() + mock_session_instance = Mock() + mock_session.return_value = mock_session_instance + mock_session_instance.create_client.return_value = mock_client + + exporter = AwsCloudWatchEmfExporter(namespace="TestNamespace", log_group_name="test-log-group") + + # Just verify the exporter was created successfully with region handling + self.assertIsNotNone(exporter) + self.assertEqual(exporter.namespace, "TestNamespace") + + def test_force_flush_no_pending_events(self): + """Test force flush functionality with no pending events.""" + result = self.exporter.force_flush() + + self.assertTrue(result) + + @patch.object(AwsCloudWatchEmfExporter, "force_flush") + def test_shutdown(self, mock_force_flush): + """Test shutdown functionality.""" + mock_force_flush.return_value = True + + result = self.exporter.shutdown(timeout_millis=5000) + + self.assertTrue(result) + mock_force_flush.assert_called_once_with(5000) + + # pylint: disable=broad-exception-caught + def test_send_log_event_method_exists(self): + """Test that _send_log_event method exists and can be called.""" + # Just test that the method exists and doesn't crash with basic input + log_event = {"message": "test message", "timestamp": 1234567890} + + # Mock the log client to avoid actual AWS calls + with patch.object(self.exporter.log_client, "send_log_event") as mock_send: + # Should not raise an exception + try: + self.exporter._send_log_event(log_event) + mock_send.assert_called_once_with(log_event) + except Exception as error: + self.fail(f"_send_log_event raised an exception: {error}") + if __name__ == "__main__": unittest.main() diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_cloudwatch_log_client.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_cloudwatch_log_client.py new file mode 100644 index 000000000..0215962db --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_cloudwatch_log_client.py @@ -0,0 +1,583 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=too-many-public-methods + +import time +import unittest +from unittest.mock import Mock, patch + +from botocore.exceptions import ClientError + +from amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client import CloudWatchLogClient + + +class TestCloudWatchLogClient(unittest.TestCase): + """Test CloudWatchLogClient class.""" + + def setUp(self): + """Set up test fixtures.""" + # Mock the botocore session to avoid AWS calls + with patch("botocore.session.Session") as mock_session: + mock_client = Mock() + mock_session_instance = Mock() + mock_session.return_value = mock_session_instance + mock_session_instance.create_client.return_value = mock_client + + self.log_client = CloudWatchLogClient(log_group_name="test-log-group") + + def test_initialization(self): + """Test log client initialization.""" + self.assertEqual(self.log_client.log_group_name, "test-log-group") + self.assertIsNotNone(self.log_client.log_stream_name) + self.assertTrue(self.log_client.log_stream_name.startswith("otel-python-")) + + @patch("botocore.session.Session") + def test_initialization_with_custom_params(self, mock_session): + """Test log client initialization with custom parameters.""" + # Mock the botocore session to avoid AWS calls + mock_client = Mock() + mock_session_instance = Mock() + mock_session.return_value = mock_session_instance + mock_session_instance.create_client.return_value = mock_client + + log_client = CloudWatchLogClient( + log_group_name="custom-log-group", + log_stream_name="custom-stream", + aws_region="us-west-2", + ) + self.assertEqual(log_client.log_group_name, "custom-log-group") + self.assertEqual(log_client.log_stream_name, "custom-stream") + + def test_generate_log_stream_name(self): + """Test log stream name generation.""" + name1 = self.log_client._generate_log_stream_name() + name2 = self.log_client._generate_log_stream_name() + + # Should generate unique names + self.assertNotEqual(name1, name2) + self.assertTrue(name1.startswith("otel-python-")) + self.assertTrue(name2.startswith("otel-python-")) + + def test_create_log_group_if_needed_success(self): + """Test log group creation when needed.""" + # This method should not raise an exception + self.log_client._create_log_group_if_needed() + + def test_create_log_group_if_needed_already_exists(self): + """Test log group creation when it already exists.""" + # Mock the create_log_group to raise ResourceAlreadyExistsException + self.log_client.logs_client.create_log_group.side_effect = ClientError( + {"Error": {"Code": "ResourceAlreadyExistsException"}}, "CreateLogGroup" + ) + + # This should not raise an exception + self.log_client._create_log_group_if_needed() + + def test_create_log_group_if_needed_failure(self): + """Test log group creation failure.""" + # Mock the create_log_group to raise AccessDenied error + self.log_client.logs_client.create_log_group.side_effect = ClientError( + {"Error": {"Code": "AccessDenied"}}, "CreateLogGroup" + ) + + with self.assertRaises(ClientError): + self.log_client._create_log_group_if_needed() + + def test_create_event_batch(self): + """Test event batch creation.""" + batch = self.log_client._create_event_batch() + + self.assertEqual(batch.log_events, []) + self.assertEqual(batch.byte_total, 0) + self.assertEqual(batch.min_timestamp_ms, 0) + self.assertEqual(batch.max_timestamp_ms, 0) + self.assertIsInstance(batch.created_timestamp_ms, int) + + def test_validate_log_event_valid(self): + """Test log event validation with valid event.""" + log_event = {"message": "test message", "timestamp": int(time.time() * 1000)} + + result = self.log_client._validate_log_event(log_event) + self.assertTrue(result) + + def test_validate_log_event_empty_message(self): + """Test log event validation with empty message.""" + log_event = {"message": "", "timestamp": int(time.time() * 1000)} + + result = self.log_client._validate_log_event(log_event) + self.assertFalse(result) + + # Test whitespace-only message + whitespace_event = {"message": " ", "timestamp": int(time.time() * 1000)} + result = self.log_client._validate_log_event(whitespace_event) + self.assertFalse(result) + + # Test missing message key + missing_message_event = {"timestamp": int(time.time() * 1000)} + result = self.log_client._validate_log_event(missing_message_event) + self.assertFalse(result) + + def test_validate_log_event_oversized_message(self): + """Test log event validation with oversized message.""" + # Create a message larger than the maximum allowed size + large_message = "x" * (self.log_client.CW_MAX_EVENT_PAYLOAD_BYTES + 100) + log_event = {"message": large_message, "timestamp": int(time.time() * 1000)} + + result = self.log_client._validate_log_event(log_event) + self.assertTrue(result) # Should still be valid after truncation + # Check that message was truncated + self.assertLess(len(log_event["message"]), len(large_message)) + self.assertTrue(log_event["message"].endswith(self.log_client.CW_TRUNCATED_SUFFIX)) + + def test_validate_log_event_old_timestamp(self): + """Test log event validation with very old timestamp.""" + # Timestamp from 15 days ago + old_timestamp = int(time.time() * 1000) - (15 * 24 * 60 * 60 * 1000) + log_event = {"message": "test message", "timestamp": old_timestamp} + + result = self.log_client._validate_log_event(log_event) + self.assertFalse(result) + + def test_validate_log_event_future_timestamp(self): + """Test log event validation with future timestamp.""" + # Timestamp 3 hours in the future + future_timestamp = int(time.time() * 1000) + (3 * 60 * 60 * 1000) + log_event = {"message": "test message", "timestamp": future_timestamp} + + result = self.log_client._validate_log_event(log_event) + self.assertFalse(result) + + def test_event_batch_exceeds_limit_by_count(self): + """Test batch limit checking by event count.""" + batch = self.log_client._create_event_batch() + # Simulate batch with maximum events + for _ in range(self.log_client.CW_MAX_REQUEST_EVENT_COUNT): + batch.add_event({"message": "test", "timestamp": int(time.time() * 1000)}, 10) + + result = self.log_client._event_batch_exceeds_limit(batch, 100) + self.assertTrue(result) + + def test_event_batch_exceeds_limit_by_size(self): + """Test batch limit checking by byte size.""" + batch = self.log_client._create_event_batch() + # Manually set byte_total to near limit + batch.byte_total = self.log_client.CW_MAX_REQUEST_PAYLOAD_BYTES - 50 + + result = self.log_client._event_batch_exceeds_limit(batch, 100) + self.assertTrue(result) + + def test_event_batch_within_limits(self): + """Test batch limit checking within limits.""" + batch = self.log_client._create_event_batch() + for _ in range(10): + batch.add_event({"message": "test", "timestamp": int(time.time() * 1000)}, 100) + + result = self.log_client._event_batch_exceeds_limit(batch, 100) + self.assertFalse(result) + + def test_is_batch_active_new_batch(self): + """Test batch activity check for new batch.""" + batch = self.log_client._create_event_batch() + current_time = int(time.time() * 1000) + + result = self.log_client._is_batch_active(batch, current_time) + self.assertTrue(result) + + def test_is_batch_active_24_hour_span(self): + """Test batch activity check for 24+ hour span.""" + batch = self.log_client._create_event_batch() + current_time = int(time.time() * 1000) + # Add an event to set the timestamps + batch.add_event({"message": "test", "timestamp": current_time}, 10) + + # Test with timestamp 25 hours in the future + future_timestamp = current_time + (25 * 60 * 60 * 1000) + + result = self.log_client._is_batch_active(batch, future_timestamp) + self.assertFalse(result) + + def test_log_event_batch_add_event(self): + """Test adding log event to batch.""" + batch = self.log_client._create_event_batch() + log_event = {"message": "test message", "timestamp": int(time.time() * 1000)} + event_size = 100 + + batch.add_event(log_event, event_size) + + self.assertEqual(batch.size(), 1) + self.assertEqual(batch.byte_total, event_size) + self.assertEqual(batch.min_timestamp_ms, log_event["timestamp"]) + self.assertEqual(batch.max_timestamp_ms, log_event["timestamp"]) + + def test_sort_log_events(self): + """Test sorting log events by timestamp.""" + batch = self.log_client._create_event_batch() + current_time = int(time.time() * 1000) + + # Add events with timestamps in reverse order + events = [ + {"message": "third", "timestamp": current_time + 2000}, + {"message": "first", "timestamp": current_time}, + {"message": "second", "timestamp": current_time + 1000}, + ] + + # Add events to batch in unsorted order + for event in events: + batch.add_event(event, 10) + + self.log_client._sort_log_events(batch) + + # Check that events are now sorted by timestamp + self.assertEqual(batch.log_events[0]["message"], "first") + self.assertEqual(batch.log_events[1]["message"], "second") + self.assertEqual(batch.log_events[2]["message"], "third") + + @patch.object(CloudWatchLogClient, "_send_log_batch") + def test_flush_pending_events_with_pending_events(self, mock_send_batch): + """Test flush pending events functionality with pending events.""" + # Create a batch with events + self.log_client._event_batch = self.log_client._create_event_batch() + self.log_client._event_batch.add_event({"message": "test", "timestamp": int(time.time() * 1000)}, 10) + + result = self.log_client.flush_pending_events() + + self.assertTrue(result) + mock_send_batch.assert_called_once() + + def test_flush_pending_events_no_pending_events(self): + """Test flush pending events functionality with no pending events.""" + # No batch exists + self.assertIsNone(self.log_client._event_batch) + + result = self.log_client.flush_pending_events() + + self.assertTrue(result) + + def test_send_log_event_method_exists(self): + """Test that send_log_event method exists and can be called.""" + # Just test that the method exists and doesn't crash with basic input + log_event = {"message": "test message", "timestamp": 1234567890} + + # Mock the AWS client methods to avoid actual AWS calls + with patch.object(self.log_client.logs_client, "put_log_events") as mock_put: + mock_put.return_value = {"nextSequenceToken": "12345"} + + # Should not raise an exception + try: + self.log_client.send_log_event(log_event) + # Method should complete without error + except ClientError as error: + self.fail(f"send_log_event raised an exception: {error}") + + def test_send_log_batch_with_resource_not_found(self): + """Test lazy creation when put_log_events fails with ResourceNotFoundException.""" + batch = self.log_client._create_event_batch() + batch.add_event({"message": "test message", "timestamp": int(time.time() * 1000)}, 10) + + # Mock put_log_events to fail first, then succeed + mock_put = self.log_client.logs_client.put_log_events + mock_put.side_effect = [ + ClientError({"Error": {"Code": "ResourceNotFoundException"}}, "PutLogEvents"), + {"nextSequenceToken": "12345"}, + ] + + # Mock the create methods + mock_create_group = Mock() + mock_create_stream = Mock() + self.log_client._create_log_group_if_needed = mock_create_group + self.log_client._create_log_stream_if_needed = mock_create_stream + + # Should not raise an exception and should create resources + self.log_client._send_log_batch(batch) + + # Verify that creation methods were called + mock_create_group.assert_called_once() + mock_create_stream.assert_called_once() + + # Verify put_log_events was called twice (initial attempt + retry) + self.assertEqual(mock_put.call_count, 2) + + def test_send_log_batch_with_other_error(self): + """Test that non-ResourceNotFoundException errors are re-raised.""" + batch = self.log_client._create_event_batch() + batch.add_event({"message": "test message", "timestamp": int(time.time() * 1000)}, 10) + + # Mock put_log_events to fail with different error + self.log_client.logs_client.put_log_events.side_effect = ClientError( + {"Error": {"Code": "AccessDenied"}}, "PutLogEvents" + ) + + # Should raise the original exception + with self.assertRaises(ClientError): + self.log_client._send_log_batch(batch) + + def test_create_log_stream_if_needed_success(self): + """Test log stream creation when needed.""" + # This method should not raise an exception + self.log_client._create_log_stream_if_needed() + + def test_create_log_stream_if_needed_already_exists(self): + """Test log stream creation when it already exists.""" + # Mock the create_log_stream to raise ResourceAlreadyExistsException + self.log_client.logs_client.create_log_stream.side_effect = ClientError( + {"Error": {"Code": "ResourceAlreadyExistsException"}}, "CreateLogStream" + ) + + # This should not raise an exception + self.log_client._create_log_stream_if_needed() + + def test_create_log_stream_if_needed_failure(self): + """Test log stream creation failure.""" + # Mock the create_log_stream to raise AccessDenied error + self.log_client.logs_client.create_log_stream.side_effect = ClientError( + {"Error": {"Code": "AccessDenied"}}, "CreateLogStream" + ) + + with self.assertRaises(ClientError): + self.log_client._create_log_stream_if_needed() + + def test_send_log_batch_success(self): + """Test successful log batch sending.""" + batch = self.log_client._create_event_batch() + batch.add_event({"message": "test message", "timestamp": int(time.time() * 1000)}, 10) + + # Mock successful put_log_events call + self.log_client.logs_client.put_log_events.return_value = {"nextSequenceToken": "12345"} + + # Should not raise an exception + result = self.log_client._send_log_batch(batch) + self.assertEqual(result["nextSequenceToken"], "12345") + + def test_send_log_batch_empty_batch(self): + """Test sending empty batch does nothing.""" + batch = self.log_client._create_event_batch() + # Empty batch should return early without calling AWS + + result = self.log_client._send_log_batch(batch) + self.assertIsNone(result) + + # Verify put_log_events was not called + self.log_client.logs_client.put_log_events.assert_not_called() + + def test_is_batch_active_flush_interval_reached(self): + """Test batch activity check when flush interval is reached.""" + batch = self.log_client._create_event_batch() + current_time = int(time.time() * 1000) + + # Set the batch creation time to more than flush interval ago + batch.created_timestamp_ms = current_time - (self.log_client.BATCH_FLUSH_INTERVAL + 1000) + # Add an event to set the timestamps + batch.add_event({"message": "test", "timestamp": current_time}, 10) + + result = self.log_client._is_batch_active(batch, current_time) + self.assertFalse(result) + + def test_send_log_event_with_invalid_event(self): + """Test send_log_event with an invalid event that fails validation.""" + # Create an event that will fail validation (empty message) + log_event = {"message": "", "timestamp": int(time.time() * 1000)} + + # Should not raise an exception, but should not call put_log_events + self.log_client.send_log_event(log_event) + + # Verify put_log_events was not called due to validation failure + self.log_client.logs_client.put_log_events.assert_not_called() + + def test_send_log_event_batching_logic(self): + """Test that send_log_event properly batches events.""" + log_event = {"message": "test message", "timestamp": int(time.time() * 1000)} + + # Mock put_log_events to not be called initially (batching) + self.log_client.logs_client.put_log_events.return_value = {"nextSequenceToken": "12345"} + + # Send one event (should be batched, not sent immediately) + self.log_client.send_log_event(log_event) + + # Verify event was added to batch + self.assertIsNotNone(self.log_client._event_batch) + self.assertEqual(self.log_client._event_batch.size(), 1) + + # put_log_events should not be called yet (event is batched) + self.log_client.logs_client.put_log_events.assert_not_called() + + def test_send_log_event_force_batch_send(self): + """Test that send_log_event sends batch when limits are exceeded.""" + # Mock put_log_events + self.log_client.logs_client.put_log_events.return_value = {"nextSequenceToken": "12345"} + + # Create events to reach the maximum event count limit + current_time = int(time.time() * 1000) + + # Send events up to the limit (should all be batched) + for event_index in range(self.log_client.CW_MAX_REQUEST_EVENT_COUNT): + log_event = {"message": f"test message {event_index}", "timestamp": current_time} + self.log_client.send_log_event(log_event) + + # At this point, no batch should have been sent yet + self.log_client.logs_client.put_log_events.assert_not_called() + + # Send one more event (should trigger batch send due to count limit) + final_event = {"message": "final message", "timestamp": current_time} + self.log_client.send_log_event(final_event) + + # put_log_events should have been called once + self.log_client.logs_client.put_log_events.assert_called_once() + + def test_log_event_batch_clear(self): + """Test clearing a log event batch.""" + batch = self.log_client._create_event_batch() + batch.add_event({"message": "test", "timestamp": int(time.time() * 1000)}, 100) + + # Verify batch has content + self.assertFalse(batch.is_empty()) + self.assertEqual(batch.size(), 1) + + # Clear and verify + batch.clear() + self.assertTrue(batch.is_empty()) + self.assertEqual(batch.size(), 0) + self.assertEqual(batch.byte_total, 0) + + def test_log_event_batch_timestamp_tracking(self): + """Test timestamp tracking in LogEventBatch.""" + batch = self.log_client._create_event_batch() + current_time = int(time.time() * 1000) + + # Add first event + batch.add_event({"message": "first", "timestamp": current_time}, 10) + self.assertEqual(batch.min_timestamp_ms, current_time) + self.assertEqual(batch.max_timestamp_ms, current_time) + + # Add earlier event + earlier_time = current_time - 1000 + batch.add_event({"message": "earlier", "timestamp": earlier_time}, 10) + self.assertEqual(batch.min_timestamp_ms, earlier_time) + self.assertEqual(batch.max_timestamp_ms, current_time) + + # Add later event + later_time = current_time + 1000 + batch.add_event({"message": "later", "timestamp": later_time}, 10) + self.assertEqual(batch.min_timestamp_ms, earlier_time) + self.assertEqual(batch.max_timestamp_ms, later_time) + + def test_generate_log_stream_name_format(self): + """Test log stream name generation format and uniqueness.""" + name = self.log_client._generate_log_stream_name() + self.assertTrue(name.startswith("otel-python-")) + self.assertEqual(len(name), len("otel-python-") + 8) + + # Generate another and ensure they're different + name2 = self.log_client._generate_log_stream_name() + self.assertNotEqual(name, name2) + + @patch("botocore.session.Session") + def test_initialization_with_custom_log_stream_name(self, mock_session): + """Test initialization with custom log stream name.""" + # Mock the session and client + mock_client = Mock() + mock_session.return_value.create_client.return_value = mock_client + + custom_stream = "my-custom-stream" + client = CloudWatchLogClient("test-group", log_stream_name=custom_stream) + self.assertEqual(client.log_stream_name, custom_stream) + + def test_send_log_batch_empty_batch_no_aws_call(self): + """Test sending an empty batch returns None and doesn't call AWS.""" + batch = self.log_client._create_event_batch() + result = self.log_client._send_log_batch(batch) + self.assertIsNone(result) + + # Verify put_log_events is not called for empty batch + self.log_client.logs_client.put_log_events.assert_not_called() + + def test_validate_log_event_missing_timestamp(self): + """Test validation of log event with missing timestamp.""" + log_event = {"message": "test message"} # No timestamp + result = self.log_client._validate_log_event(log_event) + + # Should be invalid - timestamp defaults to 0 which is too old + self.assertFalse(result) + + def test_validate_log_event_invalid_timestamp_past(self): + """Test validation of log event with timestamp too far in the past.""" + # Create timestamp older than 14 days + old_time = int(time.time() * 1000) - (15 * 24 * 60 * 60 * 1000) + log_event = {"message": "test message", "timestamp": old_time} + + result = self.log_client._validate_log_event(log_event) + self.assertFalse(result) + + def test_validate_log_event_invalid_timestamp_future(self): + """Test validation of log event with timestamp too far in the future.""" + # Create timestamp more than 2 hours in the future + future_time = int(time.time() * 1000) + (3 * 60 * 60 * 1000) + log_event = {"message": "test message", "timestamp": future_time} + + result = self.log_client._validate_log_event(log_event) + self.assertFalse(result) + + def test_send_log_event_validation_failure(self): + """Test send_log_event when validation fails.""" + # Create invalid event (empty message) + invalid_event = {"message": "", "timestamp": int(time.time() * 1000)} + + # Mock put_log_events to track calls + self.log_client.logs_client.put_log_events.return_value = {"nextSequenceToken": "12345"} + + # Send invalid event + self.log_client.send_log_event(invalid_event) + + # Should not call put_log_events or create batch + self.log_client.logs_client.put_log_events.assert_not_called() + self.assertIsNone(self.log_client._event_batch) + + def test_send_log_event_exception_handling(self): + """Test exception handling in send_log_event.""" + # Mock _validate_log_event to raise an exception + with patch.object(self.log_client, "_validate_log_event", side_effect=Exception("Test error")): + log_event = {"message": "test", "timestamp": int(time.time() * 1000)} + + with self.assertRaises(Exception) as context: + self.log_client.send_log_event(log_event) + + self.assertEqual(str(context.exception), "Test error") + + def test_flush_pending_events_no_batch(self): + """Test flush pending events when no batch exists.""" + # Ensure no batch exists + self.log_client._event_batch = None + + result = self.log_client.flush_pending_events() + self.assertTrue(result) + + # Should not call send_log_batch + self.log_client.logs_client.put_log_events.assert_not_called() + + def test_is_batch_active_edge_cases(self): + """Test edge cases for batch activity checking.""" + batch = self.log_client._create_event_batch() + current_time = int(time.time() * 1000) + + # Test exactly at 24 hour boundary (should still be active) + batch.add_event({"message": "test", "timestamp": current_time}, 10) + exactly_24h_future = current_time + (24 * 60 * 60 * 1000) + result = self.log_client._is_batch_active(batch, exactly_24h_future) + self.assertTrue(result) + + # Test just over 24 hour boundary (should be inactive) + over_24h_future = current_time + (24 * 60 * 60 * 1000 + 1) + result = self.log_client._is_batch_active(batch, over_24h_future) + self.assertFalse(result) + + # Test exactly at flush interval boundary + # Create a new batch for this test + batch2 = self.log_client._create_event_batch() + batch2.add_event({"message": "test", "timestamp": current_time}, 10) + batch2.created_timestamp_ms = current_time - self.log_client.BATCH_FLUSH_INTERVAL + result = self.log_client._is_batch_active(batch2, current_time) + self.assertFalse(result) + + +if __name__ == "__main__": + unittest.main()