55
66import json
77import logging
8- import os
98import time
109import uuid
1110from collections import defaultdict
@@ -47,6 +46,38 @@ class AwsCloudWatchEMFExporter(MetricExporter):
4746
4847 """
4948
49+ # CloudWatch EMF supported units
50+ # Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
51+ EMF_SUPPORTED_UNITS = {
52+ "Seconds" ,
53+ "Microseconds" ,
54+ "Milliseconds" ,
55+ "Bytes" ,
56+ "Kilobytes" ,
57+ "Megabytes" ,
58+ "Gigabytes" ,
59+ "Terabytes" ,
60+ "Bits" ,
61+ "Kilobits" ,
62+ "Megabits" ,
63+ "Gigabits" ,
64+ "Terabits" ,
65+ "Percent" ,
66+ "Count" ,
67+ "Bytes/Second" ,
68+ "Kilobytes/Second" ,
69+ "Megabytes/Second" ,
70+ "Gigabytes/Second" ,
71+ "Terabytes/Second" ,
72+ "Bits/Second" ,
73+ "Kilobits/Second" ,
74+ "Megabits/Second" ,
75+ "Gigabits/Second" ,
76+ "Terabits/Second" ,
77+ "Count/Second" ,
78+ "None" ,
79+ }
80+
5081 # OTel to CloudWatch unit mapping
5182 # Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188
5283 UNIT_MAPPING = {
@@ -79,17 +110,23 @@ def __init__(
79110 preferred_temporality: Optional dictionary mapping instrument types to aggregation temporality
80111 **kwargs: Additional arguments passed to botocore client
81112 """
113+ # Set up temporality preference default to DELTA if customers not set
114+ if preferred_temporality is None :
115+ preferred_temporality = {
116+ Counter : AggregationTemporality .DELTA ,
117+ Histogram : AggregationTemporality .DELTA ,
118+ ObservableCounter : AggregationTemporality .DELTA ,
119+ ObservableGauge : AggregationTemporality .DELTA ,
120+ ObservableUpDownCounter : AggregationTemporality .DELTA ,
121+ UpDownCounter : AggregationTemporality .DELTA ,
122+ }
123+
82124 super ().__init__ (preferred_temporality )
83125
84126 self .namespace = namespace
85127 self .log_group_name = log_group_name
86128 self .log_stream_name = log_stream_name or self ._generate_log_stream_name ()
87129
88- # Initialize CloudWatch Logs client using botocore
89- # If aws_region is not provided, botocore will check environment variables AWS_REGION or AWS_DEFAULT_REGION
90- if aws_region is None :
91- aws_region = os .environ .get ("AWS_REGION" ) or os .environ .get ("AWS_DEFAULT_REGION" )
92-
93130 session = botocore .session .Session ()
94131 self .logs_client = session .create_client ("logs" , region_name = aws_region , ** kwargs )
95132
@@ -99,6 +136,8 @@ def __init__(
99136 # Ensure log stream exists
100137 self ._ensure_log_stream_exists ()
101138
139+ # Default to unique log stream name matching OTel Collector
140+ # EMF Exporter behavior with language for source identification
102141 def _generate_log_stream_name (self ) -> str :
103142 """Generate a unique log stream name."""
104143
@@ -108,12 +147,12 @@ def _generate_log_stream_name(self) -> str:
108147 def _ensure_log_group_exists (self ):
109148 """Ensure the log group exists, create if it doesn't."""
110149 try :
111- self .logs_client .describe_log_groups ( logGroupNamePrefix = self .log_group_name , limit = 1 )
112- except ClientError :
113- try :
114- self . logs_client . create_log_group ( logGroupName = self . log_group_name )
115- logger .info ( "Created log group: %s" , self .log_group_name )
116- except ClientError as error :
150+ self .logs_client .create_log_group ( logGroupName = self .log_group_name )
151+ logger . info ( "Created log group: %s" , self . log_group_name )
152+ except ClientError as error :
153+ if error . response . get ( "Error" , {}). get ( "Code" ) == "ResourceAlreadyExistsException" :
154+ logger .debug ( "Log group %s already exists " , self .log_group_name )
155+ else :
117156 logger .error ("Failed to create log group %s : %s" , self .log_group_name , error )
118157 raise
119158
@@ -130,7 +169,7 @@ def _ensure_log_stream_exists(self):
130169
131170 def _get_metric_name (self , record : Any ) -> Optional [str ]:
132171 """Get the metric name from the metric record or data point."""
133- # For compatibility with older record format
172+
134173 if hasattr (record , "instrument" ) and hasattr (record .instrument , "name" ) and record .instrument .name :
135174 return record .instrument .name
136175 # Return None if no valid metric name found
@@ -147,7 +186,17 @@ def _get_unit(self, instrument_or_metric: Any) -> Optional[str]:
147186 if not unit :
148187 return None
149188
150- return self .UNIT_MAPPING .get (unit , unit )
189+ # First check if unit is already a supported EMF unit
190+ if unit in self .EMF_SUPPORTED_UNITS :
191+ return unit
192+
193+ # Otherwise, try to map from OTel unit to CloudWatch unit
194+ mapped_unit = self .UNIT_MAPPING .get (unit )
195+ if mapped_unit is not None :
196+ return mapped_unit
197+
198+ # If unit is not supported, return None
199+ return None
151200
152201 def _get_dimension_names (self , attributes : Dict [str , Any ]) -> List [str ]:
153202 """Extract dimension names from attributes."""
@@ -185,7 +234,11 @@ def _normalize_timestamp(self, timestamp_ns: int) -> int:
185234
186235 # pylint: disable=no-member
187236 def _create_metric_record (self , metric_name : str , metric_unit : str , metric_description : str ) -> Any :
188- """Create a base metric record with instrument information.
237+ """
238+ Creates the intermediate metric data structure that standardizes different otel metric representation
239+ and will be used to generate EMF events. The base record
240+ establishes the instrument schema (name/unit/description) that will be populated
241+ with dimensions, timestamps, and values during metric processing.
189242
190243 Args:
191244 metric_name: Name of the metric
@@ -255,6 +308,7 @@ def _create_emf_log(self, metric_records: List[Any], resource: Resource, timesta
255308 emf_log = {"_aws" : {"Timestamp" : timestamp or int (time .time () * 1000 ), "CloudWatchMetrics" : []}}
256309
257310 # Set with latest EMF version schema
311+ # opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/metric_translator.go#L414
258312 emf_log ["Version" ] = "1"
259313
260314 # Add resource attributes to EMF log but not as dimensions
@@ -267,9 +321,7 @@ def _create_emf_log(self, metric_records: List[Any], resource: Resource, timesta
267321 emf_log [f"otel.resource.{ key } " ] = str (value )
268322
269323 # Initialize collections for dimensions and metrics
270-
271324 metric_definitions = []
272-
273325 # Collect attributes from all records (they should be the same for all records in the group)
274326 # Only collect once from the first record and apply to all records
275327 all_attributes = (
@@ -339,7 +391,7 @@ def _send_log_event(self, log_event: Dict[str, Any]):
339391 return response
340392
341393 except ClientError as error :
342- logger .error ("Failed to send log event: %s" , error )
394+ logger .debug ("Failed to send log event: %s" , error )
343395 raise
344396
345397 # pylint: disable=too-many-nested-blocks
@@ -438,46 +490,3 @@ def shutdown(self, timeout_millis: Optional[int] = None, **kwargs: Any) -> bool:
438490 self .force_flush (timeout_millis )
439491 logger .debug ("AwsCloudWatchEMFExporter shutdown called with timeout_millis=%s" , timeout_millis )
440492 return True
441-
442-
443- def create_emf_exporter (
444- namespace : str = "OTelPython" ,
445- log_group_name : str = "/aws/otel/python" ,
446- log_stream_name : Optional [str ] = None ,
447- aws_region : Optional [str ] = None ,
448- ** kwargs ,
449- ) -> AwsCloudWatchEMFExporter :
450- """
451- Convenience function to create a CloudWatch EMF exporter with DELTA temporality.
452-
453- Args:
454- namespace: CloudWatch namespace for metrics
455- log_group_name: CloudWatch log group name
456- log_stream_name: CloudWatch log stream name (auto-generated if None)
457- aws_region: AWS region (auto-detected if None)
458- debug: Whether to enable debug printing of EMF logs
459- **kwargs: Additional arguments passed to the AwsCloudWatchEMFExporter
460-
461- Returns:
462- Configured AwsCloudWatchEMFExporter instance
463- """
464-
465- # Set up temporality preference - always use DELTA for CloudWatch
466- temporality_dict = {
467- Counter : AggregationTemporality .DELTA ,
468- Histogram : AggregationTemporality .DELTA ,
469- ObservableCounter : AggregationTemporality .DELTA ,
470- ObservableGauge : AggregationTemporality .DELTA ,
471- ObservableUpDownCounter : AggregationTemporality .DELTA ,
472- UpDownCounter : AggregationTemporality .DELTA ,
473- }
474-
475- # Create and return the exporter
476- return AwsCloudWatchEMFExporter (
477- namespace = namespace ,
478- log_group_name = log_group_name ,
479- log_stream_name = log_stream_name ,
480- aws_region = aws_region ,
481- preferred_temporality = temporality_dict ,
482- ** kwargs ,
483- )
0 commit comments