Skip to content

Commit 51e16e6

Browse files
authored
Extend CloudWatch EMF exporter with Sum and Histogram support (#409)
*Description of changes:* Extend CloudWatch EMF exporter with Sum, Histogram, and ExponentialHistogram support Add support for additional metric types beyond Gauge: - Sum metrics - Histogram metrics with exponential histogram - ExponentialHistogram to SEH - CW Logs batching and validation By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent e1149f5 commit 51e16e6

File tree

4 files changed

+1438
-293
lines changed

4 files changed

+1438
-293
lines changed
Lines changed: 381 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
# pylint: disable=no-self-use
5+
6+
import logging
7+
import time
8+
import uuid
9+
from typing import Any, Dict, List, Optional
10+
11+
import botocore.session
12+
from botocore.exceptions import ClientError
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class LogEventBatch:
18+
"""
19+
Container for a batch of CloudWatch log events with metadata.
20+
21+
Tracks the log events, total byte size, and timestamps
22+
for efficient batching and validation.
23+
"""
24+
25+
def __init__(self):
26+
"""Initialize an empty log event batch."""
27+
self.log_events: List[Dict[str, Any]] = []
28+
self.byte_total: int = 0
29+
self.min_timestamp_ms: int = 0
30+
self.max_timestamp_ms: int = 0
31+
self.created_timestamp_ms: int = int(time.time() * 1000)
32+
33+
def add_event(self, log_event: Dict[str, Any], event_size: int) -> None:
34+
"""
35+
Add a log event to the batch.
36+
37+
Args:
38+
log_event: The log event to add
39+
event_size: The byte size of the event
40+
"""
41+
self.log_events.append(log_event)
42+
self.byte_total += event_size
43+
44+
# Update timestamp tracking
45+
timestamp = log_event.get("timestamp", 0)
46+
if self.min_timestamp_ms == 0 or timestamp < self.min_timestamp_ms:
47+
self.min_timestamp_ms = timestamp
48+
if timestamp > self.max_timestamp_ms:
49+
self.max_timestamp_ms = timestamp
50+
51+
def is_empty(self) -> bool:
52+
"""Check if the batch is empty."""
53+
return len(self.log_events) == 0
54+
55+
def size(self) -> int:
56+
"""Get the number of events in the batch."""
57+
return len(self.log_events)
58+
59+
def clear(self) -> None:
60+
"""Clear the batch."""
61+
self.log_events.clear()
62+
self.byte_total = 0
63+
self.min_timestamp_ms = 0
64+
self.max_timestamp_ms = 0
65+
self.created_timestamp_ms = int(time.time() * 1000)
66+
67+
68+
class CloudWatchLogClient:
69+
"""
70+
CloudWatch Logs client for batching and sending log events.
71+
72+
This class handles the batching logic and CloudWatch Logs API interactions
73+
for sending EMF logs efficiently while respecting CloudWatch Logs constraints.
74+
"""
75+
76+
# Constants for CloudWatch Logs limits
77+
# http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
78+
# http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
79+
CW_MAX_EVENT_PAYLOAD_BYTES = 256 * 1024 # 256KB
80+
CW_MAX_REQUEST_EVENT_COUNT = 10000
81+
CW_PER_EVENT_HEADER_BYTES = 26
82+
BATCH_FLUSH_INTERVAL = 60 * 1000
83+
CW_MAX_REQUEST_PAYLOAD_BYTES = 1 * 1024 * 1024 # 1MB
84+
CW_TRUNCATED_SUFFIX = "[Truncated...]"
85+
# None of the log events in the batch can be older than 14 days
86+
CW_EVENT_TIMESTAMP_LIMIT_PAST = 14 * 24 * 60 * 60 * 1000
87+
# None of the log events in the batch can be more than 2 hours in the future.
88+
CW_EVENT_TIMESTAMP_LIMIT_FUTURE = 2 * 60 * 60 * 1000
89+
90+
def __init__(
91+
self,
92+
log_group_name: str,
93+
log_stream_name: Optional[str] = None,
94+
aws_region: Optional[str] = None,
95+
**kwargs,
96+
):
97+
"""
98+
Initialize the CloudWatch Logs client.
99+
100+
Args:
101+
log_group_name: CloudWatch log group name
102+
log_stream_name: CloudWatch log stream name (auto-generated if None)
103+
aws_region: AWS region (auto-detected if None)
104+
**kwargs: Additional arguments passed to botocore client
105+
"""
106+
self.log_group_name = log_group_name
107+
self.log_stream_name = log_stream_name or self._generate_log_stream_name()
108+
109+
session = botocore.session.Session()
110+
self.logs_client = session.create_client("logs", region_name=aws_region, **kwargs)
111+
112+
# Event batch to store logs before sending to CloudWatch
113+
self._event_batch = None
114+
115+
def _generate_log_stream_name(self) -> str:
116+
"""Generate a unique log stream name."""
117+
unique_id = str(uuid.uuid4())[:8]
118+
return f"otel-python-{unique_id}"
119+
120+
def _create_log_group_if_needed(self):
121+
"""Create log group if it doesn't exist."""
122+
try:
123+
self.logs_client.create_log_group(logGroupName=self.log_group_name)
124+
logger.info("Created log group: %s", self.log_group_name)
125+
except ClientError as error:
126+
if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException":
127+
logger.debug("Log group %s already exists", self.log_group_name)
128+
else:
129+
logger.error("Failed to create log group %s : %s", self.log_group_name, error)
130+
raise
131+
132+
def _create_log_stream_if_needed(self):
133+
"""Create log stream if it doesn't exist."""
134+
try:
135+
self.logs_client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name)
136+
logger.info("Created log stream: %s", self.log_stream_name)
137+
except ClientError as error:
138+
if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException":
139+
logger.debug("Log stream %s already exists", self.log_stream_name)
140+
else:
141+
logger.error("Failed to create log stream %s : %s", self.log_stream_name, error)
142+
raise
143+
144+
def _validate_log_event(self, log_event: Dict) -> bool:
145+
"""
146+
Validate the log event according to CloudWatch Logs constraints.
147+
Implements the same validation logic as the Go version.
148+
149+
Args:
150+
log_event: The log event to validate
151+
152+
Returns:
153+
bool: True if valid, False otherwise
154+
"""
155+
156+
# Check empty message
157+
if not log_event.get("message") or not log_event.get("message").strip():
158+
logger.error("Empty log event message")
159+
return False
160+
161+
message = log_event.get("message", "")
162+
timestamp = log_event.get("timestamp", 0)
163+
164+
# Check message size
165+
message_size = len(message) + self.CW_PER_EVENT_HEADER_BYTES
166+
if message_size > self.CW_MAX_EVENT_PAYLOAD_BYTES:
167+
logger.warning(
168+
"Log event size %s exceeds maximum allowed size %s. Truncating.",
169+
message_size,
170+
self.CW_MAX_EVENT_PAYLOAD_BYTES,
171+
)
172+
max_message_size = (
173+
self.CW_MAX_EVENT_PAYLOAD_BYTES - self.CW_PER_EVENT_HEADER_BYTES - len(self.CW_TRUNCATED_SUFFIX)
174+
)
175+
log_event["message"] = message[:max_message_size] + self.CW_TRUNCATED_SUFFIX
176+
177+
# Check timestamp constraints
178+
current_time = int(time.time() * 1000) # Current time in milliseconds
179+
event_time = timestamp
180+
181+
# Calculate the time difference
182+
time_diff = current_time - event_time
183+
184+
# Check if too old or too far in the future
185+
if time_diff > self.CW_EVENT_TIMESTAMP_LIMIT_PAST or time_diff < -self.CW_EVENT_TIMESTAMP_LIMIT_FUTURE:
186+
logger.error(
187+
"Log event timestamp %s is either older than 14 days or more than 2 hours in the future. "
188+
"Current time: %s",
189+
event_time,
190+
current_time,
191+
)
192+
return False
193+
194+
return True
195+
196+
def _create_event_batch(self) -> LogEventBatch:
197+
"""
198+
Create a new log event batch.
199+
200+
Returns:
201+
LogEventBatch: A new event batch
202+
"""
203+
return LogEventBatch()
204+
205+
def _event_batch_exceeds_limit(self, batch: LogEventBatch, next_event_size: int) -> bool:
206+
"""
207+
Check if adding the next event would exceed CloudWatch Logs limits.
208+
209+
Args:
210+
batch: The current batch
211+
next_event_size: Size of the next event in bytes
212+
213+
Returns:
214+
bool: True if adding the next event would exceed limits
215+
"""
216+
return (
217+
batch.size() >= self.CW_MAX_REQUEST_EVENT_COUNT
218+
or batch.byte_total + next_event_size > self.CW_MAX_REQUEST_PAYLOAD_BYTES
219+
)
220+
221+
def _is_batch_active(self, batch: LogEventBatch, target_timestamp_ms: int) -> bool:
222+
"""
223+
Check if the event batch spans more than 24 hours.
224+
225+
Args:
226+
batch: The event batch
227+
target_timestamp_ms: The timestamp of the event to add
228+
229+
Returns:
230+
bool: True if the batch is active and can accept the event
231+
"""
232+
# New log event batch
233+
if batch.min_timestamp_ms == 0 or batch.max_timestamp_ms == 0:
234+
return True
235+
236+
# Check if adding the event would make the batch span more than 24 hours
237+
if target_timestamp_ms - batch.min_timestamp_ms > 24 * 3600 * 1000:
238+
return False
239+
240+
if batch.max_timestamp_ms - target_timestamp_ms > 24 * 3600 * 1000:
241+
return False
242+
243+
# flush the event batch when reached 60s interval
244+
current_time = int(time.time() * 1000)
245+
if current_time - batch.created_timestamp_ms >= self.BATCH_FLUSH_INTERVAL:
246+
return False
247+
248+
return True
249+
250+
def _sort_log_events(self, batch: LogEventBatch) -> None:
251+
"""
252+
Sort log events in the batch by timestamp.
253+
254+
Args:
255+
batch: The event batch
256+
"""
257+
batch.log_events = sorted(batch.log_events, key=lambda x: x["timestamp"])
258+
259+
def _send_log_batch(self, batch: LogEventBatch) -> None:
260+
"""
261+
Send a batch of log events to CloudWatch Logs.
262+
Creates log group and stream lazily if they don't exist.
263+
264+
Args:
265+
batch: The event batch
266+
"""
267+
if batch.is_empty():
268+
return None
269+
270+
# Sort log events by timestamp
271+
self._sort_log_events(batch)
272+
273+
# Prepare the PutLogEvents request
274+
put_log_events_input = {
275+
"logGroupName": self.log_group_name,
276+
"logStreamName": self.log_stream_name,
277+
"logEvents": batch.log_events,
278+
}
279+
280+
start_time = time.time()
281+
282+
try:
283+
# Make the PutLogEvents call
284+
response = self.logs_client.put_log_events(**put_log_events_input)
285+
286+
elapsed_ms = int((time.time() - start_time) * 1000)
287+
logger.debug(
288+
"Successfully sent %s log events (%s KB) in %s ms",
289+
batch.size(),
290+
batch.byte_total / 1024,
291+
elapsed_ms,
292+
)
293+
294+
return response
295+
296+
except ClientError as error:
297+
# Handle resource not found errors by creating log group/stream
298+
error_code = error.response.get("Error", {}).get("Code")
299+
if error_code == "ResourceNotFoundException":
300+
logger.info("Log group or stream not found, creating resources and retrying")
301+
302+
try:
303+
# Create log group first
304+
self._create_log_group_if_needed()
305+
# Then create log stream
306+
self._create_log_stream_if_needed()
307+
308+
# Retry the PutLogEvents call
309+
response = self.logs_client.put_log_events(**put_log_events_input)
310+
311+
elapsed_ms = int((time.time() - start_time) * 1000)
312+
logger.debug(
313+
"Successfully sent %s log events (%s KB) in %s ms after creating resources",
314+
batch.size(),
315+
batch.byte_total / 1024,
316+
elapsed_ms,
317+
)
318+
319+
return response
320+
321+
except ClientError as retry_error:
322+
logger.error("Failed to send log events after creating resources: %s", retry_error)
323+
raise
324+
else:
325+
logger.error("Failed to send log events: %s", error)
326+
raise
327+
328+
def send_log_event(self, log_event: Dict[str, Any]):
329+
"""
330+
Send a log event to CloudWatch Logs.
331+
332+
This function implements the same logic as the Go version in the OTel Collector.
333+
It batches log events according to CloudWatch Logs constraints and sends them
334+
when the batch is full or spans more than 24 hours.
335+
336+
Args:
337+
log_event: The log event to send
338+
"""
339+
try:
340+
# Validate the log event
341+
if not self._validate_log_event(log_event):
342+
return
343+
344+
# Calculate event size
345+
event_size = len(log_event["message"]) + self.CW_PER_EVENT_HEADER_BYTES
346+
347+
# Initialize event batch if needed
348+
if self._event_batch is None:
349+
self._event_batch = self._create_event_batch()
350+
351+
# Check if we need to send the current batch and create a new one
352+
current_batch = self._event_batch
353+
if self._event_batch_exceeds_limit(current_batch, event_size) or not self._is_batch_active(
354+
current_batch, log_event["timestamp"]
355+
):
356+
# Send the current batch
357+
self._send_log_batch(current_batch)
358+
# Create a new batch
359+
self._event_batch = self._create_event_batch()
360+
current_batch = self._event_batch
361+
362+
# Add the log event to the batch
363+
current_batch.add_event(log_event, event_size)
364+
365+
except Exception as error:
366+
logger.error("Failed to process log event: %s", error)
367+
raise
368+
369+
def flush_pending_events(self) -> bool:
370+
"""
371+
Flush any pending log events.
372+
373+
Returns:
374+
True if successful, False otherwise
375+
"""
376+
if self._event_batch is not None and not self._event_batch.is_empty():
377+
current_batch = self._event_batch
378+
self._send_log_batch(current_batch)
379+
self._event_batch = self._create_event_batch()
380+
logger.debug("CloudWatchLogClient flushed the buffered log events")
381+
return True

0 commit comments

Comments
 (0)