Skip to content

Commit f7fcaaa

Browse files
committed
Merge remote-tracking branch 'liustve/logs-mainline' into genesis-logs
2 parents f0ebea2 + f571ffb commit f7fcaaa

File tree

6 files changed

+127
-55
lines changed

6 files changed

+127
-55
lines changed
Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,33 @@ def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: di
136136
"""
137137
Estimates the size in bytes of a log by calculating the size of its body and its attributes
138138
and adding a buffer amount to account for other log metadata information.
139-
Will process complex log structures up to the specified depth limit.
140-
Includes cycle detection to prevent processing the log content more than once.
141-
If the depth limit of the log structure is exceeded, returns the truncated calculation
142-
to everything up to that point.
139+
140+
Features:
141+
- Processes complex log structures up to the specified depth limit
142+
- Includes cycle detection to prevent processing the same content more than once
143+
- Returns truncated calculation if depth limit is exceeded
144+
145+
We set depth to 3 as this is the minimum required depth to estimate our consolidated Gen AI log events:
146+
147+
Example structure:
148+
{
149+
"output": {
150+
"messages": [
151+
{
152+
"content": "Hello, World!",
153+
"role": "assistant"
154+
}
155+
]
156+
},
157+
"input": {
158+
"messages": [
159+
{
160+
"content": "Say Hello, World!",
161+
"role": "user"
162+
}
163+
]
164+
}
165+
}
143166
144167
Args:
145168
log: The Log object to calculate size for
@@ -175,12 +198,12 @@ def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: di
175198
if next_val is None:
176199
continue
177200

178-
if isinstance(next_val, (str, bytes)):
201+
if isinstance(next_val, bytes):
179202
size += len(next_val)
180203
continue
181204

182-
if isinstance(next_val, (float, int, bool)):
183-
size += len(str(next_val))
205+
if isinstance(next_val, (str, float, int, bool)):
206+
size += AwsCloudWatchOtlpBatchLogRecordProcessor._estimate_utf8_size(str(next_val))
184207
continue
185208

186209
# next_val must be Sequence["AnyValue"] or Mapping[str, "AnyValue"]
@@ -211,6 +234,20 @@ def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: di
211234

212235
return size
213236

237+
@staticmethod
238+
def _estimate_utf8_size(s: str):
239+
ascii_count = 0
240+
non_ascii_count = 0
241+
242+
for char in s:
243+
if ord(char) < 128:
244+
ascii_count += 1
245+
else:
246+
non_ascii_count += 1
247+
248+
# Estimate: ASCII chars (1 byte) + upper bound of non-ASCII chars 4 bytes
249+
return ascii_count + (non_ascii_count * 4)
250+
214251
# Only export the logs once to avoid the race condition of the worker thread and force flush thread
215252
# https://github.com/open-telemetry/opentelemetry-python/issues/3193
216253
# https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L199

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import logging
77
import random
88
from io import BytesIO
9-
from time import sleep, time
9+
from threading import Event
10+
from time import time
1011
from typing import Dict, Optional, Sequence
1112

1213
from botocore.session import Session
@@ -69,6 +70,7 @@ def __init__(
6970
compression=Compression.Gzip,
7071
session=AwsAuthSession(session=session, aws_region=self._aws_region, service="logs"),
7172
)
73+
self._shutdown_event = Event()
7274

7375
def export(self, batch: Sequence[LogData]) -> LogExportResult:
7476
"""
@@ -123,9 +125,18 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
123125
resp.reason,
124126
backoff_seconds,
125127
)
126-
sleep(backoff_seconds)
128+
# Use interruptible sleep that can be interrupted by shutdown
129+
if self._shutdown_event.wait(backoff_seconds):
130+
_logger.info("Export interrupted by shutdown")
131+
return LogExportResult.FAILURE
132+
127133
retry_num += 1
128134

135+
def shutdown(self) -> None:
136+
"""Shutdown the exporter and interrupt any ongoing waits."""
137+
self._shutdown_event.set()
138+
return super().shutdown()
139+
129140
def _send(self, serialized_data: bytes, timeout_sec: float):
130141
try:
131142
response = self._session.post(

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/common/test_aws_auth_session.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,3 @@ def test_aws_auth_session(self, _, __):
4646
self.assertIn(AUTHORIZATION_HEADER, actual_headers)
4747
self.assertIn(X_AMZ_DATE_HEADER, actual_headers)
4848
self.assertIn(X_AMZ_SECURITY_TOKEN_HEADER, actual_headers)
49-
50-
@patch("requests.Session.request", return_value=requests.Response())
51-
@patch("botocore.session.Session.get_credentials", return_value=mock_credentials)
52-
@patch("botocore.auth.SigV4Auth.add_auth", side_effect=Exception("Signing failed"))
53-
def test_aws_auth_session_signing_error(self, mock_add_auth, mock_get_credentials, mock_request):
54-
"""Tests that aws_auth_session does not any Sigv4 headers if signing errors."""
55-
56-
session = AwsAuthSession("us-east-1", "xray")
57-
actual_headers = {"test": "test"}
58-
59-
session.request("POST", AWS_OTLP_TRACES_ENDPOINT, data="", headers=actual_headers)
60-
61-
self.assertNotIn(AUTHORIZATION_HEADER, actual_headers)
62-
self.assertNotIn(X_AMZ_DATE_HEADER, actual_headers)
63-
self.assertNotIn(X_AMZ_SECURITY_TOKEN_HEADER, actual_headers)
Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import List
66
from unittest.mock import MagicMock, patch
77

8-
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import (
8+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import (
99
AwsCloudWatchOtlpBatchLogRecordProcessor,
1010
BatchLogExportStrategy,
1111
)
@@ -116,8 +116,8 @@ def test_process_log_data_nested_structure_size_exceeds_max_log_size(self):
116116

117117
def test_process_log_data_primitive(self):
118118

119-
primitives: List[AnyValue] = ["test", b"test", 1, 1.2, True, False, None]
120-
expected_sizes = [4, 4, 1, 3, 4, 5, 0]
119+
primitives: List[AnyValue] = ["test", b"test", 1, 1.2, True, False, None, "深入 Python", "calfé"]
120+
expected_sizes = [4, 4, 1, 3, 4, 5, 0, 2 * 4 + len(" Python"), 1 * 4 + len("calf")]
121121

122122
for index, primitive in enumerate(primitives):
123123
log = self.generate_test_log_data(log_body=primitive, count=1)
@@ -136,11 +136,11 @@ def test_process_log_data_with_cycle(self):
136136
self.assertEqual(actual_size, expected_size)
137137

138138
@patch(
139-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.attach",
139+
"amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.attach",
140140
return_value=MagicMock(),
141141
)
142-
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.detach")
143-
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.set_value")
142+
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.detach")
143+
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.set_value")
144144
def test_export_single_batch_under_size_limit(self, _, __, ___):
145145
"""Tests that export is only called once if a single batch is under the size limit"""
146146
log_count = 10
@@ -163,11 +163,11 @@ def test_export_single_batch_under_size_limit(self, _, __, ___):
163163
self.mock_exporter.export.assert_called_once()
164164

165165
@patch(
166-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.attach",
166+
"amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.attach",
167167
return_value=MagicMock(),
168168
)
169-
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.detach")
170-
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.set_value")
169+
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.detach")
170+
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.set_value")
171171
def test_export_single_batch_all_logs_over_size_limit(self, _, __, ___):
172172
"""Should make multiple export calls of batch size 1 to export logs of size > 1 MB."""
173173

@@ -188,11 +188,11 @@ def test_export_single_batch_all_logs_over_size_limit(self, _, __, ___):
188188
self.assertEqual(len(batch[0]), 1)
189189

190190
@patch(
191-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.attach",
191+
"amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.attach",
192192
return_value=MagicMock(),
193193
)
194-
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.detach")
195-
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.set_value")
194+
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.detach")
195+
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.set_value")
196196
def test_export_single_batch_some_logs_over_size_limit(self, _, __, ___):
197197
"""Should make calls to export smaller sub-batch logs"""
198198
large_log_body = "X" * (self.processor._MAX_LOG_REQUEST_BYTE_SIZE + 1)
@@ -241,11 +241,11 @@ def test_force_flush_returns_false_when_shutdown(self):
241241
self.mock_exporter.export.assert_not_called()
242242

243243
@patch(
244-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.attach",
244+
"amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.attach",
245245
return_value=MagicMock(),
246246
)
247-
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.detach")
248-
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.set_value")
247+
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.detach")
248+
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.set_value")
249249
def test_force_flush_exports_only_one_batch(self, _, __, ___):
250250
"""Tests that force_flush should try to at least export one batch of logs. Rest of the logs will be dropped"""
251251
# Set max_export_batch_size to 5 to limit batch size

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_otlp_aws_logs_exporter.py

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,19 @@ def test_should_not_export_again_if_not_retryable(self, mock_request):
7474
self.assertEqual(result, LogExportResult.FAILURE)
7575

7676
@patch(
77-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None
77+
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.Event.wait",
78+
side_effect=lambda x: False,
7879
)
7980
@patch("requests.Session.post", return_value=retryable_response_no_header)
80-
def test_should_export_again_with_backoff_if_retryable_and_no_retry_after_header(self, mock_request, mock_sleep):
81+
def test_should_export_again_with_backoff_if_retryable_and_no_retry_after_header(self, mock_request, mock_wait):
8182
"""Tests that multiple export requests are made with exponential delay if the response status code is retryable.
8283
But there is no Retry-After header."""
8384
self.exporter._timeout = 10000 # Large timeout to avoid early exit
8485
result = self.exporter.export(self.logs)
8586

86-
self.assertEqual(mock_sleep.call_count, _MAX_RETRYS - 1)
87+
self.assertEqual(mock_wait.call_count, _MAX_RETRYS - 1)
8788

88-
delays = mock_sleep.call_args_list
89+
delays = mock_wait.call_args_list
8990

9091
for index, delay in enumerate(delays):
9192
expected_base = 2**index
@@ -98,28 +99,31 @@ def test_should_export_again_with_backoff_if_retryable_and_no_retry_after_header
9899
self.assertEqual(result, LogExportResult.FAILURE)
99100

100101
@patch(
101-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None
102+
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.Event.wait",
103+
side_effect=lambda x: False,
102104
)
103105
@patch(
104106
"requests.Session.post",
105107
side_effect=[retryable_response_header, retryable_response_header, retryable_response_header, good_response],
106108
)
107-
def test_should_export_again_with_server_delay_if_retryable_and_retry_after_header(self, mock_request, mock_sleep):
109+
def test_should_export_again_with_server_delay_if_retryable_and_retry_after_header(self, mock_request, mock_wait):
108110
"""Tests that multiple export requests are made with the server's suggested
109111
delay if the response status code is retryable and there is a Retry-After header."""
110112
self.exporter._timeout = 10000 # Large timeout to avoid early exit
111113
result = self.exporter.export(self.logs)
112-
delays = mock_sleep.call_args_list
114+
115+
delays = mock_wait.call_args_list
113116

114117
for delay in delays:
115118
self.assertEqual(delay[0][0], 10)
116119

117-
self.assertEqual(mock_sleep.call_count, 3)
120+
self.assertEqual(mock_wait.call_count, 3)
118121
self.assertEqual(mock_request.call_count, 4)
119122
self.assertEqual(result, LogExportResult.SUCCESS)
120123

121124
@patch(
122-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None
125+
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.Event.wait",
126+
side_effect=lambda x: False,
123127
)
124128
@patch(
125129
"requests.Session.post",
@@ -131,13 +135,14 @@ def test_should_export_again_with_server_delay_if_retryable_and_retry_after_head
131135
],
132136
)
133137
def test_should_export_again_with_backoff_delay_if_retryable_and_bad_retry_after_header(
134-
self, mock_request, mock_sleep
138+
self, mock_request, mock_wait
135139
):
136140
"""Tests that multiple export requests are made with exponential delay if the response status code is retryable.
137141
but the Retry-After header is invalid or malformed."""
138142
self.exporter._timeout = 10000 # Large timeout to avoid early exit
139143
result = self.exporter.export(self.logs)
140-
delays = mock_sleep.call_args_list
144+
145+
delays = mock_wait.call_args_list
141146

142147
for index, delay in enumerate(delays):
143148
expected_base = 2**index
@@ -146,7 +151,7 @@ def test_should_export_again_with_backoff_delay_if_retryable_and_bad_retry_after
146151
self.assertGreaterEqual(actual_delay, expected_base * 0.8)
147152
self.assertLessEqual(actual_delay, expected_base * 1.2)
148153

149-
self.assertEqual(mock_sleep.call_count, 3)
154+
self.assertEqual(mock_wait.call_count, 3)
150155
self.assertEqual(mock_request.call_count, 4)
151156
self.assertEqual(result, LogExportResult.SUCCESS)
152157

@@ -159,28 +164,44 @@ def test_export_connection_error_retry(self, mock_request):
159164
self.assertEqual(result, LogExportResult.SUCCESS)
160165

161166
@patch(
162-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None
167+
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.Event.wait",
168+
side_effect=lambda x: False,
163169
)
164170
@patch("requests.Session.post", return_value=retryable_response_no_header)
165-
def test_should_stop_retrying_when_deadline_exceeded(self, mock_request, mock_sleep):
171+
def test_should_stop_retrying_when_deadline_exceeded(self, mock_request, mock_wait):
166172
"""Tests that the exporter stops retrying when the deadline is exceeded."""
167173
self.exporter._timeout = 5 # Short timeout to trigger deadline check
168174

169-
# Mock time to simulate time passing
170175
with patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.time") as mock_time:
171176
# First call returns start time, subsequent calls simulate time passing
172177
mock_time.side_effect = [0, 0, 1, 2, 4, 8] # Exponential backoff would be 1, 2, 4 seconds
173178

174179
result = self.exporter.export(self.logs)
175180

176181
# Should stop before max retries due to deadline
177-
self.assertLess(mock_sleep.call_count, _MAX_RETRYS)
182+
self.assertLess(mock_wait.call_count, _MAX_RETRYS)
178183
self.assertLess(mock_request.call_count, _MAX_RETRYS + 1)
179184
self.assertEqual(result, LogExportResult.FAILURE)
180185

181186
# Verify total time passed is at the timeout limit
182187
self.assertGreaterEqual(5, self.exporter._timeout)
183188

189+
@patch(
190+
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.Event.wait",
191+
side_effect=lambda x: True,
192+
)
193+
@patch("requests.Session.post", return_value=retryable_response_no_header)
194+
def test_export_interrupted_by_shutdown(self, mock_request, mock_wait):
195+
"""Tests that export can be interrupted by shutdown during retry wait."""
196+
self.exporter._timeout = 10000
197+
198+
result = self.exporter.export(self.logs)
199+
200+
# Should make one request, then get interrupted during retry wait
201+
self.assertEqual(mock_request.call_count, 1)
202+
self.assertEqual(result, LogExportResult.FAILURE)
203+
mock_wait.assert_called_once()
204+
184205
@staticmethod
185206
def generate_test_log_data(count=5):
186207
logs = []

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
OtlpLogHeaderSetting,
2727
_check_emf_exporter_enabled,
2828
_custom_import_sampler,
29+
_customize_log_record_processor,
2930
_customize_logs_exporter,
3031
_customize_metric_exporters,
3132
_customize_resource,
@@ -46,6 +47,11 @@
4647
from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor
4748
from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import AwsCloudWatchEmfExporter
4849
from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
50+
51+
# pylint: disable=line-too-long
52+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import (
53+
AwsCloudWatchOtlpBatchLogRecordProcessor,
54+
)
4955
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
5056
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
5157
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
@@ -1010,6 +1016,18 @@ def test_validate_and_fetch_logs_header(self):
10101016
# Clean up
10111017
os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_HEADERS, None)
10121018

1019+
@patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled")
1020+
@patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_aws_otlp_endpoint")
1021+
def test_customize_log_record_processor_with_agent_observability(self, mock_is_aws_endpoint, mock_is_agent_enabled):
1022+
"""Test that AwsCloudWatchOtlpBatchLogRecordProcessor is used when agent observability is enabled"""
1023+
mock_exporter = MagicMock(spec=OTLPAwsLogExporter)
1024+
mock_is_agent_enabled.return_value = True
1025+
mock_is_aws_endpoint.return_value = True
1026+
1027+
processor = _customize_log_record_processor(mock_exporter)
1028+
1029+
self.assertIsInstance(processor, AwsCloudWatchOtlpBatchLogRecordProcessor)
1030+
10131031
@patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._validate_and_fetch_logs_header")
10141032
@patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session")
10151033
def test_create_emf_exporter(self, mock_get_session, mock_validate):

0 commit comments

Comments
 (0)