@@ -47,6 +47,38 @@ class AwsCloudWatchEMFExporter(MetricExporter):
4747
4848 """
4949
50+ # CloudWatch EMF supported units
51+ # Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
52+ EMF_SUPPORTED_UNITS = {
53+ "Seconds" ,
54+ "Microseconds" ,
55+ "Milliseconds" ,
56+ "Bytes" ,
57+ "Kilobytes" ,
58+ "Megabytes" ,
59+ "Gigabytes" ,
60+ "Terabytes" ,
61+ "Bits" ,
62+ "Kilobits" ,
63+ "Megabits" ,
64+ "Gigabits" ,
65+ "Terabits" ,
66+ "Percent" ,
67+ "Count" ,
68+ "Bytes/Second" ,
69+ "Kilobytes/Second" ,
70+ "Megabytes/Second" ,
71+ "Gigabytes/Second" ,
72+ "Terabytes/Second" ,
73+ "Bits/Second" ,
74+ "Kilobits/Second" ,
75+ "Megabits/Second" ,
76+ "Gigabits/Second" ,
77+ "Terabits/Second" ,
78+ "Count/Second" ,
79+ "None" ,
80+ }
81+
5082 # OTel to CloudWatch unit mapping
5183 # Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188
5284 UNIT_MAPPING = {
@@ -79,17 +111,23 @@ def __init__(
79111 preferred_temporality: Optional dictionary mapping instrument types to aggregation temporality
80112 **kwargs: Additional arguments passed to botocore client
81113 """
114+ # Set up temporality preference default to DELTA if customers not set
115+ if preferred_temporality is None :
116+ preferred_temporality = {
117+ Counter : AggregationTemporality .DELTA ,
118+ Histogram : AggregationTemporality .DELTA ,
119+ ObservableCounter : AggregationTemporality .DELTA ,
120+ ObservableGauge : AggregationTemporality .DELTA ,
121+ ObservableUpDownCounter : AggregationTemporality .DELTA ,
122+ UpDownCounter : AggregationTemporality .DELTA ,
123+ }
124+
82125 super ().__init__ (preferred_temporality )
83126
84127 self .namespace = namespace
85128 self .log_group_name = log_group_name
86129 self .log_stream_name = log_stream_name or self ._generate_log_stream_name ()
87130
88- # Initialize CloudWatch Logs client using botocore
89- # If aws_region is not provided, botocore will check environment variables AWS_REGION or AWS_DEFAULT_REGION
90- if aws_region is None :
91- aws_region = os .environ .get ("AWS_REGION" ) or os .environ .get ("AWS_DEFAULT_REGION" )
92-
93131 session = botocore .session .Session ()
94132 self .logs_client = session .create_client ("logs" , region_name = aws_region , ** kwargs )
95133
@@ -99,6 +137,8 @@ def __init__(
99137 # Ensure log stream exists
100138 self ._ensure_log_stream_exists ()
101139
140+ # Default to unique log stream name matching OTel Collector
141+ # EMF Exporter behavior with language for source identification
102142 def _generate_log_stream_name (self ) -> str :
103143 """Generate a unique log stream name."""
104144
@@ -108,12 +148,12 @@ def _generate_log_stream_name(self) -> str:
108148 def _ensure_log_group_exists (self ):
109149 """Ensure the log group exists, create if it doesn't."""
110150 try :
111- self .logs_client .describe_log_groups ( logGroupNamePrefix = self .log_group_name , limit = 1 )
112- except ClientError :
113- try :
114- self . logs_client . create_log_group ( logGroupName = self . log_group_name )
115- logger .info ( "Created log group: %s" , self .log_group_name )
116- except ClientError as error :
151+ self .logs_client .create_log_group ( logGroupName = self .log_group_name )
152+ logger . info ( "Created log group: %s" , self . log_group_name )
153+ except ClientError as error :
154+ if error . response . get ( "Error" , {}). get ( "Code" ) == "ResourceAlreadyExistsException" :
155+ logger .debug ( "Log group %s already exists " , self .log_group_name )
156+ else :
117157 logger .error ("Failed to create log group %s : %s" , self .log_group_name , error )
118158 raise
119159
@@ -130,7 +170,7 @@ def _ensure_log_stream_exists(self):
130170
131171 def _get_metric_name (self , record : Any ) -> Optional [str ]:
132172 """Get the metric name from the metric record or data point."""
133- # For compatibility with older record format
173+
134174 if hasattr (record , "instrument" ) and hasattr (record .instrument , "name" ) and record .instrument .name :
135175 return record .instrument .name
136176 # Return None if no valid metric name found
@@ -147,7 +187,17 @@ def _get_unit(self, instrument_or_metric: Any) -> Optional[str]:
147187 if not unit :
148188 return None
149189
150- return self .UNIT_MAPPING .get (unit , unit )
190+ # First check if unit is already a supported EMF unit
191+ if unit in self .EMF_SUPPORTED_UNITS :
192+ return unit
193+
194+ # Otherwise, try to map from OTel unit to CloudWatch unit
195+ mapped_unit = self .UNIT_MAPPING .get (unit )
196+ if mapped_unit is not None :
197+ return mapped_unit
198+
199+ # If unit is not supported, return None
200+ return None
151201
152202 def _get_dimension_names (self , attributes : Dict [str , Any ]) -> List [str ]:
153203 """Extract dimension names from attributes."""
@@ -185,7 +235,11 @@ def _normalize_timestamp(self, timestamp_ns: int) -> int:
185235
186236 # pylint: disable=no-member
187237 def _create_metric_record (self , metric_name : str , metric_unit : str , metric_description : str ) -> Any :
188- """Create a base metric record with instrument information.
238+ """
239+ Creates the intermediate metric data structure that standardizes different otel metric representation
240+ and will be used to generate EMF events. The base record
241+ establishes the instrument schema (name/unit/description) that will be populated
242+ with dimensions, timestamps, and values during metric processing.
189243
190244 Args:
191245 metric_name: Name of the metric
@@ -255,6 +309,7 @@ def _create_emf_log(self, metric_records: List[Any], resource: Resource, timesta
255309 emf_log = {"_aws" : {"Timestamp" : timestamp or int (time .time () * 1000 ), "CloudWatchMetrics" : []}}
256310
257311 # Set with latest EMF version schema
312+ # opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/metric_translator.go#L414
258313 emf_log ["Version" ] = "1"
259314
260315 # Add resource attributes to EMF log but not as dimensions
@@ -267,9 +322,7 @@ def _create_emf_log(self, metric_records: List[Any], resource: Resource, timesta
267322 emf_log [f"otel.resource.{ key } " ] = str (value )
268323
269324 # Initialize collections for dimensions and metrics
270-
271325 metric_definitions = []
272-
273326 # Collect attributes from all records (they should be the same for all records in the group)
274327 # Only collect once from the first record and apply to all records
275328 all_attributes = (
@@ -339,7 +392,7 @@ def _send_log_event(self, log_event: Dict[str, Any]):
339392 return response
340393
341394 except ClientError as error :
342- logger .error ("Failed to send log event: %s" , error )
395+ logger .debug ("Failed to send log event: %s" , error )
343396 raise
344397
345398 # pylint: disable=too-many-nested-blocks
@@ -438,46 +491,3 @@ def shutdown(self, timeout_millis: Optional[int] = None, **kwargs: Any) -> bool:
438491 self .force_flush (timeout_millis )
439492 logger .debug ("AwsCloudWatchEMFExporter shutdown called with timeout_millis=%s" , timeout_millis )
440493 return True
441-
442-
443- def create_emf_exporter (
444- namespace : str = "OTelPython" ,
445- log_group_name : str = "/aws/otel/python" ,
446- log_stream_name : Optional [str ] = None ,
447- aws_region : Optional [str ] = None ,
448- ** kwargs ,
449- ) -> AwsCloudWatchEMFExporter :
450- """
451- Convenience function to create a CloudWatch EMF exporter with DELTA temporality.
452-
453- Args:
454- namespace: CloudWatch namespace for metrics
455- log_group_name: CloudWatch log group name
456- log_stream_name: CloudWatch log stream name (auto-generated if None)
457- aws_region: AWS region (auto-detected if None)
458- debug: Whether to enable debug printing of EMF logs
459- **kwargs: Additional arguments passed to the AwsCloudWatchEMFExporter
460-
461- Returns:
462- Configured AwsCloudWatchEMFExporter instance
463- """
464-
465- # Set up temporality preference - always use DELTA for CloudWatch
466- temporality_dict = {
467- Counter : AggregationTemporality .DELTA ,
468- Histogram : AggregationTemporality .DELTA ,
469- ObservableCounter : AggregationTemporality .DELTA ,
470- ObservableGauge : AggregationTemporality .DELTA ,
471- ObservableUpDownCounter : AggregationTemporality .DELTA ,
472- UpDownCounter : AggregationTemporality .DELTA ,
473- }
474-
475- # Create and return the exporter
476- return AwsCloudWatchEMFExporter (
477- namespace = namespace ,
478- log_group_name = log_group_name ,
479- log_stream_name = log_stream_name ,
480- aws_region = aws_region ,
481- preferred_temporality = temporality_dict ,
482- ** kwargs ,
483- )
0 commit comments