Skip to content

Commit 88bf900

Browse files
committed
Extend CloudWatch EMF exporter with Sum, Histogram, and ExponentialHistogram support
1 parent 69b7ea0 commit 88bf900

File tree

4 files changed

+1244
-293
lines changed

4 files changed

+1244
-293
lines changed
Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
# pylint: disable=no-self-use
5+
6+
import logging
7+
import time
8+
import uuid
9+
from typing import Any, Dict, Optional
10+
11+
import botocore.session
12+
from botocore.exceptions import ClientError
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class CloudWatchLogClient:
18+
"""
19+
CloudWatch Logs client for batching and sending log events.
20+
21+
This class handles the batching logic and CloudWatch Logs API interactions
22+
for sending EMF logs efficiently while respecting CloudWatch Logs constraints.
23+
"""
24+
25+
# Constants for CloudWatch Logs limits
26+
# http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
27+
CW_MAX_EVENT_PAYLOAD_BYTES = 256 * 1024 # 256KB
28+
CW_MAX_REQUEST_EVENT_COUNT = 10000
29+
CW_PER_EVENT_HEADER_BYTES = 26
30+
BATCH_FLUSH_INTERVAL = 60 * 1000
31+
CW_MAX_REQUEST_PAYLOAD_BYTES = 1 * 1024 * 1024 # 1MB
32+
CW_TRUNCATED_SUFFIX = "[Truncated...]"
33+
# None of the log events in the batch can be older than 14 days
34+
CW_EVENT_TIMESTAMP_LIMIT_PAST = 14 * 24 * 60 * 60 * 1000
35+
# None of the log events in the batch can be more than 2 hours in the future.
36+
CW_EVENT_TIMESTAMP_LIMIT_FUTURE = 2 * 60 * 60 * 1000
37+
38+
def __init__(
39+
self,
40+
log_group_name: str,
41+
log_stream_name: Optional[str] = None,
42+
aws_region: Optional[str] = None,
43+
**kwargs,
44+
):
45+
"""
46+
Initialize the CloudWatch Logs client.
47+
48+
Args:
49+
log_group_name: CloudWatch log group name
50+
log_stream_name: CloudWatch log stream name (auto-generated if None)
51+
aws_region: AWS region (auto-detected if None)
52+
**kwargs: Additional arguments passed to botocore client
53+
"""
54+
self.log_group_name = log_group_name
55+
self.log_stream_name = log_stream_name or self._generate_log_stream_name()
56+
57+
session = botocore.session.Session()
58+
self.logs_client = session.create_client("logs", region_name=aws_region, **kwargs)
59+
60+
# Event batch to store logs before sending to CloudWatch
61+
self._event_batch = None
62+
63+
def _generate_log_stream_name(self) -> str:
64+
"""Generate a unique log stream name."""
65+
unique_id = str(uuid.uuid4())[:8]
66+
return f"otel-python-{unique_id}"
67+
68+
def _create_log_group_if_needed(self):
69+
"""Create log group if it doesn't exist."""
70+
try:
71+
self.logs_client.create_log_group(logGroupName=self.log_group_name)
72+
logger.info("Created log group: %s", self.log_group_name)
73+
except ClientError as error:
74+
if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException":
75+
logger.debug("Log group %s already exists", self.log_group_name)
76+
else:
77+
logger.error("Failed to create log group %s : %s", self.log_group_name, error)
78+
raise
79+
80+
def _create_log_stream_if_needed(self):
81+
"""Create log stream if it doesn't exist."""
82+
try:
83+
self.logs_client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name)
84+
logger.info("Created log stream: %s", self.log_stream_name)
85+
except ClientError as error:
86+
if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException":
87+
logger.debug("Log stream %s already exists", self.log_stream_name)
88+
else:
89+
logger.error("Failed to create log stream %s : %s", self.log_stream_name, error)
90+
raise
91+
92+
def _validate_log_event(self, log_event: Dict) -> bool:
93+
"""
94+
Validate the log event according to CloudWatch Logs constraints.
95+
Implements the same validation logic as the Go version.
96+
97+
Args:
98+
log_event: The log event to validate
99+
100+
Returns:
101+
bool: True if valid, False otherwise
102+
"""
103+
104+
# Check empty message
105+
if not log_event.get("message"):
106+
logger.error("Empty log event message")
107+
return False
108+
109+
message = log_event.get("message", "")
110+
timestamp = log_event.get("timestamp", 0)
111+
112+
# Check message size
113+
message_size = len(message) + self.CW_PER_EVENT_HEADER_BYTES
114+
if message_size > self.CW_MAX_EVENT_PAYLOAD_BYTES:
115+
logger.warning(
116+
"Log event size %s exceeds maximum allowed size %s. Truncating.",
117+
message_size,
118+
self.CW_MAX_EVENT_PAYLOAD_BYTES,
119+
)
120+
max_message_size = (
121+
self.CW_MAX_EVENT_PAYLOAD_BYTES - self.CW_PER_EVENT_HEADER_BYTES - len(self.CW_TRUNCATED_SUFFIX)
122+
)
123+
log_event["message"] = message[:max_message_size] + self.CW_TRUNCATED_SUFFIX
124+
125+
# Check timestamp constraints
126+
current_time = int(time.time() * 1000) # Current time in milliseconds
127+
event_time = timestamp
128+
129+
# Calculate the time difference
130+
time_diff = current_time - event_time
131+
132+
# Check if too old or too far in the future
133+
if time_diff > self.CW_EVENT_TIMESTAMP_LIMIT_PAST or time_diff < -self.CW_EVENT_TIMESTAMP_LIMIT_FUTURE:
134+
logger.error(
135+
"Log event timestamp %s is either older than 14 days or more than 2 hours in the future. "
136+
"Current time: %s",
137+
event_time,
138+
current_time,
139+
)
140+
return False
141+
142+
return True
143+
144+
def _create_event_batch(self) -> Dict:
145+
"""
146+
Create a new log event batch.
147+
148+
Returns:
149+
Dict: A new event batch
150+
"""
151+
return {
152+
"logEvents": [],
153+
"byteTotal": 0,
154+
"minTimestampMs": 0,
155+
"maxTimestampMs": 0,
156+
"createdTimestampMs": int(time.time() * 1000),
157+
}
158+
159+
def _event_batch_exceeds_limit(self, batch: Dict, next_event_size: int) -> bool:
160+
"""
161+
Check if adding the next event would exceed CloudWatch Logs limits.
162+
163+
Args:
164+
batch: The current batch
165+
next_event_size: Size of the next event in bytes
166+
167+
Returns:
168+
bool: True if adding the next event would exceed limits
169+
"""
170+
return (
171+
len(batch["logEvents"]) >= self.CW_MAX_REQUEST_EVENT_COUNT
172+
or batch["byteTotal"] + next_event_size > self.CW_MAX_REQUEST_PAYLOAD_BYTES
173+
)
174+
175+
def _is_batch_active(self, batch: Dict, target_timestamp_ms: int) -> bool:
176+
"""
177+
Check if the event batch spans more than 24 hours.
178+
179+
Args:
180+
batch: The event batch
181+
target_timestamp_ms: The timestamp of the event to add
182+
183+
Returns:
184+
bool: True if the batch is active and can accept the event
185+
"""
186+
# New log event batch
187+
if batch["minTimestampMs"] == 0 or batch["maxTimestampMs"] == 0:
188+
return True
189+
190+
# Check if adding the event would make the batch span more than 24 hours
191+
if target_timestamp_ms - batch["minTimestampMs"] > 24 * 3600 * 1000:
192+
return False
193+
194+
if batch["maxTimestampMs"] - target_timestamp_ms > 24 * 3600 * 1000:
195+
return False
196+
197+
# flush the event batch when reached 60s interval
198+
current_time = int(time.time() * 1000)
199+
if current_time - batch["createdTimestampMs"] >= self.BATCH_FLUSH_INTERVAL:
200+
return False
201+
202+
return True
203+
204+
def _append_to_batch(self, batch: Dict, log_event: Dict, event_size: int) -> None:
205+
"""
206+
Append a log event to the batch.
207+
208+
Args:
209+
batch: The event batch
210+
log_event: The log event to append
211+
event_size: Size of the event in bytes
212+
"""
213+
batch["logEvents"].append(log_event)
214+
batch["byteTotal"] += event_size
215+
216+
timestamp = log_event["timestamp"]
217+
if batch["minTimestampMs"] == 0 or batch["minTimestampMs"] > timestamp:
218+
batch["minTimestampMs"] = timestamp
219+
220+
if batch["maxTimestampMs"] == 0 or batch["maxTimestampMs"] < timestamp:
221+
batch["maxTimestampMs"] = timestamp
222+
223+
def _sort_log_events(self, batch: Dict) -> None:
224+
"""
225+
Sort log events in the batch by timestamp.
226+
227+
Args:
228+
batch: The event batch
229+
"""
230+
batch["logEvents"] = sorted(batch["logEvents"], key=lambda x: x["timestamp"])
231+
232+
def _send_log_batch(self, batch: Dict) -> None:
233+
"""
234+
Send a batch of log events to CloudWatch Logs.
235+
Creates log group and stream lazily if they don't exist.
236+
237+
Args:
238+
batch: The event batch
239+
"""
240+
if not batch["logEvents"]:
241+
return None
242+
243+
# Sort log events by timestamp
244+
self._sort_log_events(batch)
245+
246+
# Prepare the PutLogEvents request
247+
put_log_events_input = {
248+
"logGroupName": self.log_group_name,
249+
"logStreamName": self.log_stream_name,
250+
"logEvents": batch["logEvents"],
251+
}
252+
253+
start_time = time.time()
254+
255+
try:
256+
# Make the PutLogEvents call
257+
response = self.logs_client.put_log_events(**put_log_events_input)
258+
259+
elapsed_ms = int((time.time() - start_time) * 1000)
260+
logger.debug(
261+
"Successfully sent %s log events (%s KB) in %s ms",
262+
len(batch["logEvents"]),
263+
batch["byteTotal"] / 1024,
264+
elapsed_ms,
265+
)
266+
267+
return response
268+
269+
except ClientError as error:
270+
# Handle resource not found errors by creating log group/stream
271+
error_code = error.response.get("Error", {}).get("Code")
272+
if error_code == "ResourceNotFoundException":
273+
logger.info("Log group or stream not found, creating resources and retrying")
274+
275+
try:
276+
# Create log group first
277+
self._create_log_group_if_needed()
278+
# Then create log stream
279+
self._create_log_stream_if_needed()
280+
281+
# Retry the PutLogEvents call
282+
response = self.logs_client.put_log_events(**put_log_events_input)
283+
284+
elapsed_ms = int((time.time() - start_time) * 1000)
285+
logger.debug(
286+
"Successfully sent %s log events (%s KB) in %s ms after creating resources",
287+
len(batch["logEvents"]),
288+
batch["byteTotal"] / 1024,
289+
elapsed_ms,
290+
)
291+
292+
return response
293+
294+
except ClientError as retry_error:
295+
logger.error("Failed to send log events after creating resources: %s", retry_error)
296+
raise
297+
else:
298+
logger.error("Failed to send log events: %s", error)
299+
raise
300+
301+
def send_log_event(self, log_event: Dict[str, Any]):
302+
"""
303+
Send a log event to CloudWatch Logs.
304+
305+
This function implements the same logic as the Go version in the OTel Collector.
306+
It batches log events according to CloudWatch Logs constraints and sends them
307+
when the batch is full or spans more than 24 hours.
308+
309+
Args:
310+
log_event: The log event to send
311+
"""
312+
try:
313+
# Validate the log event
314+
if not self._validate_log_event(log_event):
315+
return
316+
317+
# Calculate event size
318+
event_size = len(log_event["message"]) + self.CW_PER_EVENT_HEADER_BYTES
319+
320+
# Initialize event batch if needed
321+
if self._event_batch is None:
322+
self._event_batch = self._create_event_batch()
323+
324+
# Check if we need to send the current batch and create a new one
325+
current_batch = self._event_batch
326+
if self._event_batch_exceeds_limit(current_batch, event_size) or not self._is_batch_active(
327+
current_batch, log_event["timestamp"]
328+
):
329+
# Send the current batch
330+
self._send_log_batch(current_batch)
331+
# Create a new batch
332+
self._event_batch = self._create_event_batch()
333+
current_batch = self._event_batch
334+
335+
# Add the log event to the batch
336+
self._append_to_batch(current_batch, log_event, event_size)
337+
338+
except Exception as error:
339+
logger.error("Failed to process log event: %s", error)
340+
raise
341+
342+
def flush_pending_events(self) -> bool:
343+
"""
344+
Flush any pending log events.
345+
346+
Returns:
347+
True if successful, False otherwise
348+
"""
349+
if self._event_batch is not None and len(self._event_batch["logEvents"]) > 0:
350+
current_batch = self._event_batch
351+
self._send_log_batch(current_batch)
352+
self._event_batch = self._create_event_batch()
353+
logger.debug("CloudWatchLogClient flushed the buffered log events")
354+
return True

0 commit comments

Comments
 (0)