3535logger = logging .getLogger (__name__ )
3636
3737
38- class CloudWatchEMFExporter (MetricExporter ):
38+ class AwsCloudWatchEMFExporter (MetricExporter ):
3939 """
4040 OpenTelemetry metrics exporter for CloudWatch EMF format.
4141
4242 This exporter converts OTel metrics into CloudWatch EMF logs which are then
4343 sent to CloudWatch Logs. CloudWatch Logs automatically extracts the metrics
4444 from the EMF logs.
45+
46+ https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html
47+
4548 """
4649
4750 # OTel to CloudWatch unit mapping
51+ # Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188
4852 UNIT_MAPPING = {
53+ "1" : "" ,
54+ "ns" : "" ,
4955 "ms" : "Milliseconds" ,
5056 "s" : "Seconds" ,
5157 "us" : "Microseconds" ,
52- "ns" : "Nanoseconds" ,
5358 "By" : "Bytes" ,
54- "KiBy" : "Kilobytes" ,
55- "MiBy" : "Megabytes" ,
56- "GiBy" : "Gigabytes" ,
57- "TiBy" : "Terabytes" ,
58- "Bi" : "Bits" ,
59- "KiBi" : "Kilobits" ,
60- "MiBi" : "Megabits" ,
61- "GiBi" : "Gigabits" ,
62- "TiBi" : "Terabits" ,
63- "%" : "Percent" ,
64- "1" : "Count" ,
65- "{count}" : "Count" ,
59+ "bit" : "Bits" ,
6660 }
6761
6862 def __init__ (
@@ -102,6 +96,9 @@ def __init__(
10296 # Ensure log group exists
10397 self ._ensure_log_group_exists ()
10498
99+ # Ensure log stream exists
100+ self ._ensure_log_stream_exists ()
101+
105102 def _generate_log_stream_name (self ) -> str :
106103 """Generate a unique log stream name."""
107104
@@ -120,6 +117,17 @@ def _ensure_log_group_exists(self):
120117 logger .error ("Failed to create log group %s : %s" , self .log_group_name , error )
121118 raise
122119
120+ def _ensure_log_stream_exists (self ):
121+ try :
122+ self .logs_client .create_log_stream (logGroupName = self .log_group_name , logStreamName = self .log_stream_name )
123+ logger .info ("Created log stream: %s" , self .log_stream_name )
124+ except ClientError as error :
125+ if error .response .get ("Error" , {}).get ("Code" ) == "ResourceAlreadyExistsException" :
126+ logger .debug ("Log stream %s already exists" , self .log_stream_name )
127+ else :
128+ logger .error ("Failed to create log stream %s : %s" , self .log_group_name , error )
129+ raise
130+
123131 def _get_metric_name (self , record : Any ) -> Optional [str ]:
124132 """Get the metric name from the metric record or data point."""
125133 # For compatibility with older record format
@@ -250,20 +258,28 @@ def _create_emf_log(self, metric_records: List[Any], resource: Resource, timesta
250258 emf_log ["Version" ] = "1"
251259
252260 # Add resource attributes to EMF log but not as dimensions
261+ # OTel collector EMF Exporter has a resource_to_telemetry_conversion flag that will convert resource attributes
262+ # as regular metric attributes(potential dimensions). However, for this SDK EMF implementation,
263+ # we align with the OpenTelemetry concept that all metric attributes are treated as dimensions.
264+ # And have resource attributes as just additional metadata in EMF, added otel.resource as prefix to distinguish.
253265 if resource and resource .attributes :
254266 for key , value in resource .attributes .items ():
255- emf_log [f"resource.{ key } " ] = str (value )
267+ emf_log [f"otel. resource.{ key } " ] = str (value )
256268
257269 # Initialize collections for dimensions and metrics
258- all_attributes = {}
270+
259271 metric_definitions = []
260272
273+ # Collect attributes from all records (they should be the same for all records in the group)
274+ # Only collect once from the first record and apply to all records
275+ all_attributes = (
276+ metric_records [0 ].attributes
277+ if metric_records and hasattr (metric_records [0 ], "attributes" ) and metric_records [0 ].attributes
278+ else {}
279+ )
280+
261281 # Process each metric record
262282 for record in metric_records :
263- # Collect attributes from all records (they should be the same for all records in the group)
264- if hasattr (record , "attributes" ) and record .attributes :
265- for key , value in record .attributes .items ():
266- all_attributes [key ] = value
267283
268284 metric_name = self ._get_metric_name (record )
269285
@@ -279,9 +295,11 @@ def _create_emf_log(self, metric_records: List[Any], resource: Resource, timesta
279295 metric_data ["Unit" ] = unit
280296
281297 # Process gauge metrics (only type supported in PR 1)
282- if hasattr (record , "value" ):
298+ if not hasattr (record , "value" ):
283299 # Store value directly in emf_log
284- emf_log [metric_name ] = record .value
300+ logger .debug ("Skipping metric %s as it does not have valid metric value" , metric_name )
301+
302+ emf_log [metric_name ] = record .value
285303
286304 # Add to metric definitions list
287305 metric_definitions .append ({"Name" : metric_name , ** metric_data })
@@ -307,6 +325,8 @@ def _send_log_event(self, log_event: Dict[str, Any]):
307325 Send a log event to CloudWatch Logs.
308326
309327 Basic implementation for PR 1 - sends individual events directly.
328+
329+ TODO: Batching event and follow CloudWatch Logs quato constraints - number of events & size limit per payload
310330 """
311331 try :
312332 # Send the log event
@@ -354,15 +374,16 @@ def export(
354374 if not (hasattr (metric , "data" ) and hasattr (metric .data , "data_points" )):
355375 continue
356376
357- # Process only Gauge metrics in PR 1
358- if isinstance (metric .data , Gauge ):
377+ # Process metrics based on type
378+ metric_type = type (metric .data )
379+ if metric_type == Gauge :
359380 for dp in metric .data .data_points :
360381 record , timestamp_ms = self ._convert_gauge (metric , dp )
361382 grouped_metrics [self ._group_by_attributes_and_timestamp (record , timestamp_ms )].append (
362383 record
363384 )
364385 else :
365- logger .warning ("Unsupported Metric Type: %s" , type ( metric . data ) )
386+ logger .debug ("Unsupported Metric Type: %s" , metric_type )
366387
367388 # Now process each group separately to create one EMF log per group
368389 for (_ , timestamp_ms ), metric_records in grouped_metrics .items ():
@@ -390,27 +411,31 @@ def force_flush(self, timeout_millis: int = 10000) -> bool:
390411 """
391412 Force flush any pending metrics.
392413
414+ TODO: will add logic to handle gracefule shutdown
415+
393416 Args:
394417 timeout_millis: Timeout in milliseconds
395418
396419 Returns:
397420 True if successful, False otherwise
398421 """
399- logger .debug ("CloudWatchEMFExporter force flushes the buffered metrics" )
422+ logger .debug ("AWsCloudWatchEMFExporter force flushes the buffered metrics" )
400423 return True
401424
402425 def shutdown (self , timeout_millis : Optional [int ] = None , ** kwargs : Any ) -> bool :
403426 """
404427 Shutdown the exporter.
405428 Override to handle timeout and other keyword arguments, but do nothing.
406429
430+ TODO: will add logic to handle gracefule shutdown
431+
407432 Args:
408433 timeout_millis: Ignored timeout in milliseconds
409434 **kwargs: Ignored additional keyword arguments
410435 """
411436 # Intentionally do nothing
412437 self .force_flush (timeout_millis )
413- logger .debug ("CloudWatchEMFExporter shutdown called with timeout_millis=%s" , timeout_millis )
438+ logger .debug ("AwsCloudWatchEMFExporter shutdown called with timeout_millis=%s" , timeout_millis )
414439 return True
415440
416441
@@ -420,7 +445,7 @@ def create_emf_exporter(
420445 log_stream_name : Optional [str ] = None ,
421446 aws_region : Optional [str ] = None ,
422447 ** kwargs ,
423- ) -> CloudWatchEMFExporter :
448+ ) -> AwsCloudWatchEMFExporter :
424449 """
425450 Convenience function to create a CloudWatch EMF exporter with DELTA temporality.
426451
@@ -430,10 +455,10 @@ def create_emf_exporter(
430455 log_stream_name: CloudWatch log stream name (auto-generated if None)
431456 aws_region: AWS region (auto-detected if None)
432457 debug: Whether to enable debug printing of EMF logs
433- **kwargs: Additional arguments passed to the CloudWatchEMFExporter
458+ **kwargs: Additional arguments passed to the AwsCloudWatchEMFExporter
434459
435460 Returns:
436- Configured CloudWatchEMFExporter instance
461+ Configured AwsCloudWatchEMFExporter instance
437462 """
438463
439464 # Set up temporality preference - always use DELTA for CloudWatch
@@ -447,7 +472,7 @@ def create_emf_exporter(
447472 }
448473
449474 # Create and return the exporter
450- return CloudWatchEMFExporter (
475+ return AwsCloudWatchEMFExporter (
451476 namespace = namespace ,
452477 log_group_name = log_group_name ,
453478 log_stream_name = log_stream_name ,
0 commit comments