3535logger = logging .getLogger (__name__ )
3636
3737
38- class CloudWatchEMFExporter (MetricExporter ):
38+ class AwsCloudWatchEMFExporter (MetricExporter ):
3939 """
4040 OpenTelemetry metrics exporter for CloudWatch EMF format.
4141
4242 This exporter converts OTel metrics into CloudWatch EMF logs which are then
4343 sent to CloudWatch Logs. CloudWatch Logs automatically extracts the metrics
4444 from the EMF logs.
45+
46+ https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html
47+
4548 """
4649
4750 # OTel to CloudWatch unit mapping
51+ # Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188
4852 UNIT_MAPPING = {
53+ "1" : "" ,
54+ "ns" : "" ,
4955 "ms" : "Milliseconds" ,
5056 "s" : "Seconds" ,
5157 "us" : "Microseconds" ,
52- "ns" : "Nanoseconds" ,
5358 "By" : "Bytes" ,
54- "KiBy" : "Kilobytes" ,
55- "MiBy" : "Megabytes" ,
56- "GiBy" : "Gigabytes" ,
57- "TiBy" : "Terabytes" ,
58- "Bi" : "Bits" ,
59- "KiBi" : "Kilobits" ,
60- "MiBi" : "Megabits" ,
61- "GiBi" : "Gigabits" ,
62- "TiBi" : "Terabits" ,
63- "%" : "Percent" ,
64- "1" : "Count" ,
65- "{count}" : "Count" ,
59+ "bit" : "Bits" ,
6660 }
6761
6862 def __init__ (
@@ -102,6 +96,9 @@ def __init__(
10296 # Ensure log group exists
10397 self ._ensure_log_group_exists ()
10498
99+ # Ensure log stream exists
100+ self ._ensure_log_stream_exists ()
101+
105102 def _generate_log_stream_name (self ) -> str :
106103 """Generate a unique log stream name."""
107104
@@ -120,6 +117,17 @@ def _ensure_log_group_exists(self):
120117 logger .error ("Failed to create log group %s : %s" , self .log_group_name , error )
121118 raise
122119
120+ def _ensure_log_stream_exists (self ):
121+ try :
122+ self .logs_client .create_log_stream (logGroupName = self .log_group_name , logStreamName = self .log_stream_name )
123+ logger .info ("Created log stream: %s" , self .log_stream_name )
124+ except ClientError as error :
125+ if error .response .get ("Error" , {}).get ("Code" ) == "ResourceAlreadyExistsException" :
126+ logger .debug ("Log stream %s already exists" , self .log_stream_name )
127+ else :
128+ logger .error ("Failed to create log stream %s : %s" , self .log_group_name , error )
129+ raise
130+
123131 def _get_metric_name (self , record : Any ) -> Optional [str ]:
124132 """Get the metric name from the metric record or data point."""
125133 # For compatibility with older record format
@@ -250,20 +258,23 @@ def _create_emf_log(self, metric_records: List[Any], resource: Resource, timesta
250258 emf_log ["Version" ] = "1"
251259
252260 # Add resource attributes to EMF log but not as dimensions
261+
253262 if resource and resource .attributes :
254263 for key , value in resource .attributes .items ():
255- emf_log [f"resource.{ key } " ] = str (value )
264+ emf_log [f"otel. resource.{ key } " ] = str (value )
256265
257266 # Initialize collections for dimensions and metrics
258- all_attributes = {}
267+
259268 metric_definitions = []
260269
270+ # Collect attributes from all records (they should be the same for all records in the group)
271+ # Only collect once from the first record and apply to all records
272+ all_attributes = (metric_records [0 ].attributes
273+ if metric_records and hasattr (metric_records [0 ], "attributes" ) and metric_records [0 ].attributes
274+ else {})
275+
261276 # Process each metric record
262277 for record in metric_records :
263- # Collect attributes from all records (they should be the same for all records in the group)
264- if hasattr (record , "attributes" ) and record .attributes :
265- for key , value in record .attributes .items ():
266- all_attributes [key ] = value
267278
268279 metric_name = self ._get_metric_name (record )
269280
@@ -279,9 +290,11 @@ def _create_emf_log(self, metric_records: List[Any], resource: Resource, timesta
279290 metric_data ["Unit" ] = unit
280291
281292 # Process gauge metrics (only type supported in PR 1)
282- if hasattr (record , "value" ):
293+ if not hasattr (record , "value" ):
283294 # Store value directly in emf_log
284- emf_log [metric_name ] = record .value
295+ logger .debug ("Skipping metric %s as it does not have valid metric value" , metric_name )
296+
297+ emf_log [metric_name ] = record .value
285298
286299 # Add to metric definitions list
287300 metric_definitions .append ({"Name" : metric_name , ** metric_data })
@@ -307,6 +320,8 @@ def _send_log_event(self, log_event: Dict[str, Any]):
307320 Send a log event to CloudWatch Logs.
308321
309322 Basic implementation for PR 1 - sends individual events directly.
323+
324+ TODO: Batching event and follow CloudWatch Logs quato constraints - number of events & size limit per payload
310325 """
311326 try :
312327 # Send the log event
@@ -354,15 +369,16 @@ def export(
354369 if not (hasattr (metric , "data" ) and hasattr (metric .data , "data_points" )):
355370 continue
356371
357- # Process only Gauge metrics in PR 1
358- if isinstance (metric .data , Gauge ):
372+ # Process metrics based on type
373+ metric_type = type (metric .data )
374+ if metric_type == Gauge :
359375 for dp in metric .data .data_points :
360376 record , timestamp_ms = self ._convert_gauge (metric , dp )
361377 grouped_metrics [self ._group_by_attributes_and_timestamp (record , timestamp_ms )].append (
362378 record
363379 )
364380 else :
365- logger .warning ("Unsupported Metric Type: %s" , type ( metric . data ) )
381+ logger .debug ("Unsupported Metric Type: %s" , metric_type )
366382
367383 # Now process each group separately to create one EMF log per group
368384 for (_ , timestamp_ms ), metric_records in grouped_metrics .items ():
@@ -390,27 +406,31 @@ def force_flush(self, timeout_millis: int = 10000) -> bool:
390406 """
391407 Force flush any pending metrics.
392408
409+ TODO: will add logic to handle gracefule shutdown
410+
393411 Args:
394412 timeout_millis: Timeout in milliseconds
395413
396414 Returns:
397415 True if successful, False otherwise
398416 """
399- logger .debug ("CloudWatchEMFExporter force flushes the buffered metrics" )
417+ logger .debug ("AWsCloudWatchEMFExporter force flushes the buffered metrics" )
400418 return True
401419
402420 def shutdown (self , timeout_millis : Optional [int ] = None , ** kwargs : Any ) -> bool :
403421 """
404422 Shutdown the exporter.
405423 Override to handle timeout and other keyword arguments, but do nothing.
406424
425+ TODO: will add logic to handle gracefule shutdown
426+
407427 Args:
408428 timeout_millis: Ignored timeout in milliseconds
409429 **kwargs: Ignored additional keyword arguments
410430 """
411431 # Intentionally do nothing
412432 self .force_flush (timeout_millis )
413- logger .debug ("CloudWatchEMFExporter shutdown called with timeout_millis=%s" , timeout_millis )
433+ logger .debug ("AwsCloudWatchEMFExporter shutdown called with timeout_millis=%s" , timeout_millis )
414434 return True
415435
416436
@@ -420,7 +440,7 @@ def create_emf_exporter(
420440 log_stream_name : Optional [str ] = None ,
421441 aws_region : Optional [str ] = None ,
422442 ** kwargs ,
423- ) -> CloudWatchEMFExporter :
443+ ) -> AwsCloudWatchEMFExporter :
424444 """
425445 Convenience function to create a CloudWatch EMF exporter with DELTA temporality.
426446
@@ -430,10 +450,10 @@ def create_emf_exporter(
430450 log_stream_name: CloudWatch log stream name (auto-generated if None)
431451 aws_region: AWS region (auto-detected if None)
432452 debug: Whether to enable debug printing of EMF logs
433- **kwargs: Additional arguments passed to the CloudWatchEMFExporter
453+ **kwargs: Additional arguments passed to the AwsCloudWatchEMFExporter
434454
435455 Returns:
436- Configured CloudWatchEMFExporter instance
456+ Configured AwsCloudWatchEMFExporter instance
437457 """
438458
439459 # Set up temporality preference - always use DELTA for CloudWatch
@@ -447,7 +467,7 @@ def create_emf_exporter(
447467 }
448468
449469 # Create and return the exporter
450- return CloudWatchEMFExporter (
470+ return AwsCloudWatchEMFExporter (
451471 namespace = namespace ,
452472 log_group_name = log_group_name ,
453473 log_stream_name = log_stream_name ,
0 commit comments