Skip to content

Commit ff2fb5d

Browse files
committed
refactored otlp aws log exporter, add comments aws batch log processor
1 parent 651f283 commit ff2fb5d

File tree

4 files changed

+141
-67
lines changed

4 files changed

+141
-67
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3+
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
34

45
import logging
56
from typing import Mapping, Optional, Sequence, cast
@@ -19,7 +20,7 @@ class AwsCloudWatchOtlpBatchLogRecordProcessor(BatchLogRecordProcessor):
1920
Custom implementation of BatchLogRecordProcessor that manages log record batching
2021
with size-based constraints to prevent exceeding AWS CloudWatch Logs OTLP endpoint request size limits.
2122
22-
This processor still exports all logs up to _max_export_batch_size but rather than doing exactly
23+
This processor still exports all logs up to _MAX_LOG_REQUEST_BYTE_SIZE but rather than doing exactly
2324
one export, we will estimate log sizes and do multiple batch exports
2425
where each exported batch will have an additional constraint:
2526
@@ -29,9 +30,41 @@ class AwsCloudWatchOtlpBatchLogRecordProcessor(BatchLogRecordProcessor):
2930
A unique case is if the sub-batch is of data size > 1 MB, then the sub-batch will have exactly 1 log in it.
3031
"""
3132

32-
_BASE_LOG_BUFFER_BYTE_SIZE = (
33-
1000 # Buffer size in bytes to account for log metadata not included in the body or attribute size calculation
34-
)
33+
# OTel log events include fixed metadata attributes so the estimated metadata size
34+
# possibly be calculated as this with best efforts:
35+
# service.name (255 chars) + cloud.resource_id (max ARN length) + telemetry.xxx (~20 chars) +
36+
# common attributes (255 chars) +
37+
# scope + flags + traceId + spanId + numeric/timestamp fields + ...
38+
# Example log structure:
39+
# {
40+
# "resource": {
41+
# "attributes": {
42+
# "aws.local.service": "example-service123",
43+
# "telemetry.sdk.language": "python",
44+
# "service.name": "my-application",
45+
# "cloud.resource_id": "example-resource",
46+
# "aws.log.group.names": "example-log-group",
47+
# "aws.ai.agent.type": "default",
48+
# "telemetry.sdk.version": "1.x.x",
49+
# "telemetry.auto.version": "0.x.x",
50+
# "telemetry.sdk.name": "opentelemetry"
51+
# }
52+
# },
53+
# "scope": {"name": "example.instrumentation.library"},
54+
# "timeUnixNano": 1234567890123456789,
55+
# "observedTimeUnixNano": 1234567890987654321,
56+
# "severityNumber": 9,
57+
# "body": {...},
58+
# "attributes": {...},
59+
# "flags": 1,
60+
# "traceId": "abcd1234efgh5678ijkl9012mnop3456",
61+
# "spanId": "1234abcd5678efgh"
62+
# }
63+
# 2000 might be a bit of an overestimate but it's better to overestimate the size of the log
64+
# and suffer a small performance impact with batching than it is to underestimate and risk
65+
# a large log being dropped when sent to the AWS otlp endpoint.
66+
_BASE_LOG_BUFFER_BYTE_SIZE = 2000
67+
3568
_MAX_LOG_REQUEST_BYTE_SIZE = (
3669
1048576 # Maximum uncompressed/unserialized bytes / request -
3770
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
@@ -63,10 +96,11 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
6396
https://github.com/open-telemetry/opentelemetry-python/blob/bb21ebd46d070c359eee286c97bdf53bfd06759d/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143
6497
6598
Preserves existing batching behavior but will intermediarly export small log batches if
66-
the size of the data in the batch is at or above AWS CloudWatch's maximum request size limit of 1 MB.
99+
the size of the data in the batch is estimated to be at or above AWS CloudWatch's
100+
maximum request size limit of 1 MB.
67101
68-
- Data size of exported batches will ALWAYS be <= 1 MB except for the case below:
69-
- If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
102+
- Estimated data size of exported batches will typically be <= 1 MB except for the case below:
103+
- If the estimated data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
70104
"""
71105
with self._export_lock:
72106
iteration = 0
@@ -141,19 +175,17 @@ def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: di
141175
if next_val is None:
142176
continue
143177

144-
if isinstance(next_val, bool):
145-
size += 4 if next_val else 5
146-
continue
147-
148178
if isinstance(next_val, (str, bytes)):
149179
size += len(next_val)
150180
continue
151181

152-
if isinstance(next_val, (float, int)):
182+
if isinstance(next_val, (float, int, bool)):
153183
size += len(str(next_val))
154184
continue
155185

156-
# next_val must be Sequence["AnyValue"] or Mapping[str, "AnyValue"],
186+
# next_val must be Sequence["AnyValue"] or Mapping[str, "AnyValue"]
187+
# See: https://github.com/open-telemetry/opentelemetry-python/blob/\
188+
# 9426d6da834cfb4df7daedd4426bba0aa83165b5/opentelemetry-api/src/opentelemetry/util/types.py#L20
157189
if current_depth <= depth:
158190
obj_id = id(
159191
next_val

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py

Lines changed: 48 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,37 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3+
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
34

45
import gzip
56
import logging
7+
import random
68
from io import BytesIO
7-
from time import sleep
9+
from time import sleep, time
810
from typing import Dict, Optional, Sequence
911

1012
from requests import Response
1113
from requests.exceptions import ConnectionError as RequestsConnectionError
1214
from requests.structures import CaseInsensitiveDict
1315

1416
from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
15-
from opentelemetry.exporter.otlp.proto.common._internal import _create_exp_backoff_generator
1617
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
1718
from opentelemetry.exporter.otlp.proto.http import Compression
1819
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
1920
from opentelemetry.sdk._logs import LogData
2021
from opentelemetry.sdk._logs.export import LogExportResult
2122

2223
_logger = logging.getLogger(__name__)
24+
_MAX_RETRYS = 6
2325

2426

2527
class OTLPAwsLogExporter(OTLPLogExporter):
28+
"""
29+
This exporter extends the functionality of the OTLPLogExporter to allow logs to be exported
30+
to the CloudWatch Logs OTLP endpoint https://logs.[AWSRegion].amazonaws.com/v1/logs. Utilizes the aws-sdk
31+
library to sign and directly inject SigV4 Authentication to the exported request's headers.
32+
33+
See: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
34+
"""
2635

2736
_RETRY_AFTER_HEADER = "Retry-After" # See: https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
2837

@@ -56,12 +65,13 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
5665
"""
5766
Exports log batch with AWS-specific enhancements over the base OTLPLogExporter.
5867
59-
Based on upstream implementation which does not retry based on Retry-After header:
60-
https://github.com/open-telemetry/opentelemetry-python/blob/acae2c232b101d3e447a82a7161355d66aa06fa2/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py#L167
68+
Key differences from upstream OTLPLogExporter:
69+
1. Respects Retry-After header from server responses for proper throttling
70+
2. Treats HTTP 429 (Too Many Requests) as a retryable exception
71+
3. Always compresses data with gzip before sending
6172
62-
Key behaviors:
63-
1. Always compresses data with gzip before sending
64-
2. Implements Retry-After header support for throttling responses
73+
Upstream implementation does not support Retry-After header:
74+
https://github.com/open-telemetry/opentelemetry-python/blob/acae2c232b101d3e447a82a7161355d66aa06fa2/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py#L167
6575
"""
6676

6777
if self._shutdown:
@@ -74,52 +84,50 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
7484
gzip_stream.write(serialized_data)
7585
data = gzip_data.getvalue()
7686

77-
backoff = _create_exp_backoff_generator(max_value=self._MAX_RETRY_TIMEOUT)
87+
deadline_sec = time() + self._timeout
88+
retry_num = 0
7889

7990
# This loop will eventually terminate because:
8091
# 1) The export request will eventually either succeed or fail permanently
81-
# 2) The exponential backoff generator has a max value of _MAX_RETRY_TIMEOUT (64s)
82-
# 3) After enough retries, delay will equal _MAX_RETRY_TIMEOUT, forcing exit
92+
# 2) Maximum retries (_MAX_RETRYS = 6) will be reached
93+
# 3) Deadline timeout will be exceeded
8394
# 4) Non-retryable errors (4xx except 429) immediately exit the loop
8495
while True:
85-
resp = self._send(data)
96+
resp = self._send(data, deadline_sec - time())
8697

8798
if resp.ok:
8899
return LogExportResult.SUCCESS
89100

90-
delay = self._get_retry_delay_sec(resp.headers, backoff)
101+
backoff_seconds = self._get_retry_delay_sec(resp.headers, retry_num)
91102
is_retryable = self._retryable(resp)
92103

93-
if not is_retryable or delay == self._MAX_RETRY_TIMEOUT:
94-
if is_retryable:
95-
_logger.error(
96-
"Failed to export logs due to retries exhausted "
97-
"after transient error %s encountered while exporting logs batch",
98-
resp.reason,
99-
)
100-
else:
101-
_logger.error(
102-
"Failed to export logs batch code: %s, reason: %s",
103-
resp.status_code,
104-
resp.text,
105-
)
104+
if not is_retryable or retry_num + 1 == _MAX_RETRYS or backoff_seconds > (deadline_sec - time()):
105+
_logger.error(
106+
"Failed to export logs batch code: %s, reason: %s",
107+
resp.status_code,
108+
resp.text,
109+
)
106110
return LogExportResult.FAILURE
107111

108112
_logger.warning(
109-
"Transient error %s encountered while exporting logs batch, retrying in %ss.",
113+
"Transient error %s encountered while exporting logs batch, retrying in %.2fs.",
110114
resp.reason,
111-
delay,
115+
backoff_seconds,
112116
)
113117

114-
sleep(delay)
118+
# Make sleep interruptible by checking shutdown status
119+
if self._shutdown:
120+
return LogExportResult.FAILURE
121+
sleep(backoff_seconds)
122+
retry_num += 1
115123

116-
def _send(self, serialized_data: bytes):
124+
def _send(self, serialized_data: bytes, timeout_sec: float):
117125
try:
118126
response = self._session.post(
119127
url=self._endpoint,
120128
data=serialized_data,
121129
verify=self._certificate_file,
122-
timeout=self._timeout,
130+
timeout=timeout_sec,
123131
cert=self._client_cert,
124132
)
125133
return response
@@ -128,37 +136,31 @@ def _send(self, serialized_data: bytes):
128136
url=self._endpoint,
129137
data=serialized_data,
130138
verify=self._certificate_file,
131-
timeout=self._timeout,
139+
timeout=timeout_sec,
132140
cert=self._client_cert,
133141
)
134142
return response
135143

136144
@staticmethod
137145
def _retryable(resp: Response) -> bool:
138146
"""
139-
Is it a retryable response?
147+
Logic based on https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
140148
"""
141149
# See: https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
142150

143151
return resp.status_code in (429, 503) or OTLPLogExporter._retryable(resp)
144152

145-
def _get_retry_delay_sec(self, headers: CaseInsensitiveDict, backoff) -> float:
153+
def _get_retry_delay_sec(self, headers: CaseInsensitiveDict, retry_num: int) -> float:
146154
"""
147155
Get retry delay in seconds from headers or backoff strategy.
148156
"""
149-
# See: https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
150-
maybe_retry_after = headers.get(self._RETRY_AFTER_HEADER, None)
151-
152-
# Set the next retry delay to the value of the Retry-After response in the headers.
153-
# If Retry-After is not present in the headers, default to the next iteration of the
154-
# exponential backoff strategy.
155-
156-
delay = self._parse_retryable_header(maybe_retry_after)
157-
158-
if delay == -1:
159-
delay = next(backoff, self._MAX_RETRY_TIMEOUT)
160-
161-
return delay
157+
# Check for Retry-After header first, then use exponential backoff with jitter
158+
retry_after_delay = self._parse_retryable_header(headers.get(self._RETRY_AFTER_HEADER))
159+
if retry_after_delay > -1:
160+
return retry_after_delay
161+
else:
162+
# multiplying by a random number between .8 and 1.2 introduces a +/-20% jitter to each backoff.
163+
return 2**retry_num * random.uniform(0.8, 1.2)
162164

163165
@staticmethod
164166
def _parse_retryable_header(retry_header: Optional[str]) -> float:

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@
1818

1919

2020
class OTLPAwsSpanExporter(OTLPSpanExporter):
21+
"""
22+
This exporter extends the functionality of the OTLPSpanExporter to allow spans to be exported
23+
to the XRay OTLP endpoint https://xray.[AWSRegion].amazonaws.com/v1/traces. Utilizes the
24+
AwsAuthSession to sign and directly inject SigV4 Authentication to the exported request's headers.
25+
26+
See: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
27+
"""
28+
2129
def __init__(
2230
self,
2331
endpoint: Optional[str] = None,

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_otlp_aws_logs_exporter.py

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import requests
88
from requests.structures import CaseInsensitiveDict
99

10-
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
10+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import _MAX_RETRYS, OTLPAwsLogExporter
1111
from opentelemetry._logs.severity import SeverityNumber
1212
from opentelemetry.sdk._logs import LogData, LogRecord
1313
from opentelemetry.sdk._logs.export import LogExportResult
@@ -80,18 +80,21 @@ def test_should_not_export_again_if_not_retryable(self, mock_request):
8080
def test_should_export_again_with_backoff_if_retryable_and_no_retry_after_header(self, mock_request, mock_sleep):
8181
"""Tests that multiple export requests are made with exponential delay if the response status code is retryable.
8282
But there is no Retry-After header."""
83+
self.exporter._timeout = 10000 # Large timeout to avoid early exit
8384
result = self.exporter.export(self.logs)
8485

85-
# 1, 2, 4, 8, 16, 32 delays
86-
self.assertEqual(mock_sleep.call_count, 6)
86+
self.assertEqual(mock_sleep.call_count, _MAX_RETRYS - 1)
8787

8888
delays = mock_sleep.call_args_list
8989

9090
for index, delay in enumerate(delays):
91-
self.assertEqual(delay[0][0], 2**index)
91+
expected_base = 2**index
92+
actual_delay = delay[0][0]
93+
# Assert delay is within jitter range: base * [0.8, 1.2]
94+
self.assertGreaterEqual(actual_delay, expected_base * 0.8)
95+
self.assertLessEqual(actual_delay, expected_base * 1.2)
9296

93-
# Number of calls: 1 + len(1, 2, 4, 8, 16, 32 delays)
94-
self.assertEqual(mock_request.call_count, 7)
97+
self.assertEqual(mock_request.call_count, _MAX_RETRYS)
9598
self.assertEqual(result, LogExportResult.FAILURE)
9699

97100
@patch(
@@ -104,6 +107,7 @@ def test_should_export_again_with_backoff_if_retryable_and_no_retry_after_header
104107
def test_should_export_again_with_server_delay_if_retryable_and_retry_after_header(self, mock_request, mock_sleep):
105108
"""Tests that multiple export requests are made with the server's suggested
106109
delay if the response status code is retryable and there is a Retry-After header."""
110+
self.exporter._timeout = 10000 # Large timeout to avoid early exit
107111
result = self.exporter.export(self.logs)
108112
delays = mock_sleep.call_args_list
109113

@@ -130,12 +134,17 @@ def test_should_export_again_with_backoff_delay_if_retryable_and_bad_retry_after
130134
self, mock_request, mock_sleep
131135
):
132136
"""Tests that multiple export requests are made with exponential delay if the response status code is retryable.
133-
but the Retry-After header ins invalid or malformed."""
137+
but the Retry-After header is invalid or malformed."""
138+
self.exporter._timeout = 10000 # Large timeout to avoid early exit
134139
result = self.exporter.export(self.logs)
135140
delays = mock_sleep.call_args_list
136141

137142
for index, delay in enumerate(delays):
138-
self.assertEqual(delay[0][0], 2**index)
143+
expected_base = 2**index
144+
actual_delay = delay[0][0]
145+
# Assert delay is within jitter range: base * [0.8, 1.2]
146+
self.assertGreaterEqual(actual_delay, expected_base * 0.8)
147+
self.assertLessEqual(actual_delay, expected_base * 1.2)
139148

140149
self.assertEqual(mock_sleep.call_count, 3)
141150
self.assertEqual(mock_request.call_count, 4)
@@ -149,6 +158,29 @@ def test_export_connection_error_retry(self, mock_request):
149158
self.assertEqual(mock_request.call_count, 2)
150159
self.assertEqual(result, LogExportResult.SUCCESS)
151160

161+
@patch(
162+
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None
163+
)
164+
@patch("requests.Session.post", return_value=retryable_response_no_header)
165+
def test_should_stop_retrying_when_deadline_exceeded(self, mock_request, mock_sleep):
166+
"""Tests that the exporter stops retrying when the deadline is exceeded."""
167+
self.exporter._timeout = 5 # Short timeout to trigger deadline check
168+
169+
# Mock time to simulate time passing
170+
with patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.time") as mock_time:
171+
# First call returns start time, subsequent calls simulate time passing
172+
mock_time.side_effect = [0, 0, 1, 2, 4, 8] # Exponential backoff would be 1, 2, 4 seconds
173+
174+
result = self.exporter.export(self.logs)
175+
176+
# Should stop before max retries due to deadline
177+
self.assertLess(mock_sleep.call_count, _MAX_RETRYS)
178+
self.assertLess(mock_request.call_count, _MAX_RETRYS + 1)
179+
self.assertEqual(result, LogExportResult.FAILURE)
180+
181+
# Verify total time passed is at the timeout limit
182+
self.assertGreaterEqual(5, self.exporter._timeout)
183+
152184
@staticmethod
153185
def generate_test_log_data(count=5):
154186
logs = []

0 commit comments

Comments
 (0)