Skip to content

Commit 93e2836

Browse files
committed
add cycle detection
1 parent 886b009 commit 93e2836

File tree

3 files changed

+117
-98
lines changed

3 files changed

+117
-98
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import logging
55
import os
66
import re
7-
from logging import NOTSET, Logger, getLogger
7+
from logging import Logger, getLogger
88
from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union
99

1010
from importlib_metadata import version

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# SPDX-License-Identifier: Apache-2.0
33

44
import logging
5-
from typing import List, Mapping, Optional, Sequence, cast
5+
from typing import Mapping, Optional, Sequence, cast
66

77
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
88
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value
@@ -98,11 +98,12 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
9898
_logger.exception("Exception while exporting logs: %s", exception)
9999
detach(token)
100100

101-
def _estimate_log_size(self, log: LogData, depth: int = 3) -> int:
101+
def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: disable=too-many-branches
102102
"""
103103
Estimates the size in bytes of a log by calculating the size of its body and its attributes
104104
and adding a buffer amount to account for other log metadata information.
105105
Will process complex log structures up to the specified depth limit.
106+
Includes cycle detection to prevent processing the log content more than once.
106107
If the depth limit of the log structure is exceeded, returns the truncated calculation
107108
to everything up to that point.
108109
@@ -114,14 +115,21 @@ def _estimate_log_size(self, log: LogData, depth: int = 3) -> int:
114115
int: The estimated size of the log object in bytes
115116
"""
116117

117-
# Use a queue to prevent excessive recursive calls.
118-
# We calculate based on the size of the log record body and attributes for the log.
119-
queue: List[tuple[AnyValue, int]] = [(log.log_record.body, 0), (log.log_record.attributes, -1)]
118+
# Queue contains tuples of (log_content, depth) where:
119+
# - log_content is the current piece of log data being processed
120+
# - depth tracks how many levels deep we've traversed to reach this content
121+
# - body starts at depth 0 since it's an AnyValue object
122+
# - Attributes start at depth -1 since it's a Mapping[str, AnyValue] - when traversed, we will
123+
# start processing its keys at depth 0
124+
queue = [(log.log_record.body, 0), (log.log_record.attributes, -1)]
125+
126+
# Track visited complex log contents to avoid calculating the same one more than once
127+
visited = set()
120128

121129
size: int = self._BASE_LOG_BUFFER_BYTE_SIZE
122130

123131
while queue:
124-
new_queue: List[tuple[AnyValue, int]] = []
132+
new_queue = []
125133

126134
for data in queue:
127135
# small optimization, can stop calculating the size once it reaches the 1 MB limit.
@@ -130,19 +138,30 @@ def _estimate_log_size(self, log: LogData, depth: int = 3) -> int:
130138

131139
next_val, current_depth = data
132140

133-
if isinstance(next_val, (str, bytes)):
134-
size += len(next_val)
141+
if next_val is None:
135142
continue
136143

137144
if isinstance(next_val, bool):
138145
size += 4 if next_val else 5
139146
continue
140147

148+
if isinstance(next_val, (str, bytes)):
149+
size += len(next_val)
150+
continue
151+
141152
if isinstance(next_val, (float, int)):
142153
size += len(str(next_val))
143154
continue
144155

156+
# next_val must be Sequence["AnyValue"] or Mapping[str, "AnyValue"],
145157
if current_depth <= depth:
158+
obj_id = id(
159+
next_val
160+
) # Guaranteed to be unique, see: https://www.w3schools.com/python/ref_func_id.asp
161+
if obj_id in visited:
162+
continue
163+
visited.add(obj_id)
164+
146165
if isinstance(next_val, Sequence):
147166
for content in next_val:
148167
new_queue.append((cast(AnyValue, content), current_depth + 1))

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_aws_batch_log_record_processor.py

Lines changed: 89 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -24,96 +24,117 @@ def setUp(self):
2424
self.mock_exporter.export.return_value = LogExportResult.SUCCESS
2525

2626
self.processor = AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=self.mock_exporter)
27-
self.max_log_size = self.processor._MAX_LOG_REQUEST_BYTE_SIZE
28-
self.base_log_size = self.processor._BASE_LOG_BUFFER_BYTE_SIZE
2927

3028
def test_process_log_data_nested_structure(self):
3129
"""Tests that the processor correctly handles nested structures (dict/list)"""
32-
message_size = 400
33-
message = "X" * message_size
30+
log_body = "X" * 400
31+
log_key = "test"
32+
log_depth = 2
3433

35-
nest_dict_log = self.generate_test_log_data(
36-
log_body=message, attr_key="t", attr_val=message, log_body_depth=2, attr_depth=2, count=1, create_map=True
34+
nested_dict_log = self.generate_test_log_data(
35+
log_body=log_body, log_key=log_key, log_body_depth=log_depth, count=1, create_map=True
3736
)
38-
nest_array_log = self.generate_test_log_data(
39-
log_body=message, attr_key="t", attr_val=message, log_body_depth=2, attr_depth=2, count=1, create_map=False
37+
nested_array_log = self.generate_test_log_data(
38+
log_body=log_body, log_key=log_key, log_body_depth=log_depth, count=1, create_map=False
4039
)
4140

42-
expected_size = self.base_log_size + message_size * 2
41+
expected_dict_size = len(log_key) * log_depth + len(log_body)
42+
expected_array_size = len(log_body)
43+
44+
dict_size = self.processor._estimate_log_size(log=nested_dict_log[0], depth=log_depth)
45+
array_size = self.processor._estimate_log_size(log=nested_array_log[0], depth=log_depth)
46+
47+
self.assertEqual(dict_size - self.processor._BASE_LOG_BUFFER_BYTE_SIZE, expected_dict_size)
48+
self.assertEqual(array_size - self.processor._BASE_LOG_BUFFER_BYTE_SIZE, expected_array_size)
49+
50+
def test_process_log_data_with_attributes(self):
51+
"""Tests that the processor correctly handles both body and attributes"""
52+
log_body = "test_body"
53+
attr_key = "attr_key"
54+
attr_value = "attr_value"
55+
56+
record = LogRecord(
57+
timestamp=int(time.time_ns()),
58+
trace_id=0x123456789ABCDEF0123456789ABCDEF0,
59+
span_id=0x123456789ABCDEF0,
60+
trace_flags=TraceFlags(1),
61+
severity_text="INFO",
62+
severity_number=SeverityNumber.INFO,
63+
body=log_body,
64+
attributes={attr_key: attr_value},
65+
)
66+
log_data = LogData(log_record=record, instrumentation_scope=InstrumentationScope("test-scope", "1.0.0"))
4367

44-
dict_size = self.processor._estimate_log_size(log=nest_dict_log[0], depth=2)
45-
array_size = self.processor._estimate_log_size(log=nest_array_log[0], depth=2)
68+
expected_size = len(log_body) + len(attr_key) + len(attr_value)
69+
actual_size = self.processor._estimate_log_size(log_data)
4670

47-
# Asserting almost equal to account for dictionary keys in the Log object
48-
self.assertAlmostEqual(dict_size, expected_size, delta=10)
49-
self.assertAlmostEqual(array_size, expected_size, delta=10)
71+
self.assertEqual(actual_size - self.processor._BASE_LOG_BUFFER_BYTE_SIZE, expected_size)
5072

5173
def test_process_log_data_nested_structure_exceeds_depth(self):
5274
"""Tests that the processor cuts off calculation for nested structure that exceeds the depth limit"""
53-
calculated = "X" * 400
54-
message = {"calculated": calculated, "truncated": {"truncated": {"test": "X" * self.max_log_size}}}
55-
56-
# *2 since we set this message in both body and attributes
57-
expected_size = self.base_log_size + (len("calculated") + len(calculated) + len("truncated")) * 2
75+
max_depth = 0
76+
calculated_body = "X" * 400
77+
log_body = {
78+
"calculated": "X" * 400,
79+
"restOfThisLogWillBeTruncated": {"truncated": {"test": "X" * self.processor._MAX_LOG_REQUEST_BYTE_SIZE}},
80+
}
5881

59-
nest_dict_log = self.generate_test_log_data(
60-
log_body=message, attr_key="t", attr_val=message, log_body_depth=3, attr_depth=3, count=1, create_map=True
61-
)
62-
nest_array_log = self.generate_test_log_data(
63-
log_body=message, attr_key="t", attr_val=message, log_body_depth=3, attr_depth=3, count=1, create_map=False
82+
expected_size = self.processor._BASE_LOG_BUFFER_BYTE_SIZE + (
83+
len("calculated") + len(calculated_body) + len("restOfThisLogWillBeTruncated")
6484
)
6585

66-
# Only calculates log size of up to depth of 4
67-
dict_size = self.processor._estimate_log_size(log=nest_dict_log[0], depth=4)
68-
array_size = self.processor._estimate_log_size(log=nest_array_log[0], depth=4)
86+
test_logs = self.generate_test_log_data(log_body=log_body, count=1)
87+
88+
# Only calculates log size of up to depth of 0
89+
dict_size = self.processor._estimate_log_size(log=test_logs[0], depth=max_depth)
6990

70-
# Asserting almost equal to account for dictionary keys in the Log object body
71-
self.assertAlmostEqual(dict_size, expected_size, delta=10)
72-
self.assertAlmostEqual(array_size, expected_size, delta=10)
91+
self.assertEqual(dict_size, expected_size)
7392

7493
def test_process_log_data_nested_structure_size_exceeds_max_log_size(self):
7594
"""Tests that the processor returns prematurely if the size already exceeds _MAX_LOG_REQUEST_BYTE_SIZE"""
76-
# Should stop calculation at bigKey
77-
message = {
78-
"bigKey": "X" * (self.max_log_size),
79-
"smallKey": "X" * (self.max_log_size * 10),
95+
# Should stop calculation at bigKey + biggerKey and not calculate the content of biggerKey
96+
log_body = {
97+
"bigKey": "X" * (self.processor._MAX_LOG_REQUEST_BYTE_SIZE),
98+
"biggerKey": "X" * (self.processor._MAX_LOG_REQUEST_BYTE_SIZE * 100),
8099
}
81100

82-
expected_size = self.base_log_size + self.max_log_size + len("bigKey")
83-
84-
nest_dict_log = self.generate_test_log_data(
85-
log_body=message, attr_key="", attr_val="", log_body_depth=-1, attr_depth=-1, count=1, create_map=True
86-
)
87-
nest_array_log = self.generate_test_log_data(
88-
log_body=message, attr_key="", attr_val="", log_body_depth=-1, attr_depth=-1, count=1, create_map=False
101+
expected_size = (
102+
self.processor._BASE_LOG_BUFFER_BYTE_SIZE
103+
+ self.processor._MAX_LOG_REQUEST_BYTE_SIZE
104+
+ len("bigKey")
105+
+ len("biggerKey")
89106
)
90107

108+
nest_dict_log = self.generate_test_log_data(log_body=log_body, count=1, create_map=True)
109+
nest_array_log = self.generate_test_log_data(log_body=log_body, count=1, create_map=False)
110+
91111
dict_size = self.processor._estimate_log_size(log=nest_dict_log[0])
92112
array_size = self.processor._estimate_log_size(log=nest_array_log[0])
93113

94-
self.assertAlmostEqual(dict_size, expected_size, delta=10)
95-
self.assertAlmostEqual(array_size, expected_size, delta=10)
114+
self.assertEqual(dict_size, expected_size)
115+
self.assertEqual(array_size, expected_size)
96116

97117
def test_process_log_data_primitive(self):
98118

99119
primitives: List[AnyValue] = ["test", b"test", 1, 1.2, True, False, None]
100120
expected_sizes = [4, 4, 1, 3, 4, 5, 0]
101121

102122
for index, primitive in enumerate(primitives):
103-
log = self.generate_test_log_data(
104-
log_body=primitive,
105-
attr_key="",
106-
attr_val="",
107-
log_body_depth=-1,
108-
attr_depth=-1,
109-
count=1,
110-
)
111-
112-
expected_size = self.base_log_size + expected_sizes[index]
123+
log = self.generate_test_log_data(log_body=primitive, count=1)
124+
expected_size = self.processor._BASE_LOG_BUFFER_BYTE_SIZE + expected_sizes[index]
113125
actual_size = self.processor._estimate_log_size(log[0])
114-
115126
self.assertEqual(actual_size, expected_size)
116127

128+
def test_process_log_data_with_cycle(self):
129+
"""Test that processor handles processing logs with circular references only once"""
130+
cyclic_dict: dict = {"data": "test"}
131+
cyclic_dict["self_ref"] = cyclic_dict
132+
133+
log = self.generate_test_log_data(log_body=cyclic_dict, count=1)
134+
expected_size = self.processor._BASE_LOG_BUFFER_BYTE_SIZE + len("data") + len("self_ref") + len("test")
135+
actual_size = self.processor._estimate_log_size(log[0])
136+
self.assertEqual(actual_size, expected_size)
137+
117138
@patch(
118139
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.attach",
119140
return_value=MagicMock(),
@@ -124,9 +145,7 @@ def test_export_single_batch_under_size_limit(self, _, __, ___):
124145
"""Tests that export is only called once if a single batch is under the size limit"""
125146
log_count = 10
126147
log_body = "test"
127-
test_logs = self.generate_test_log_data(
128-
log_body=log_body, attr_key="", attr_val="", log_body_depth=-1, attr_depth=-1, count=log_count
129-
)
148+
test_logs = self.generate_test_log_data(log_body=log_body, count=log_count)
130149
total_data_size = 0
131150

132151
for log in test_logs:
@@ -153,9 +172,7 @@ def test_export_single_batch_all_logs_over_size_limit(self, _, __, ___):
153172
"""Should make multiple export calls of batch size 1 to export logs of size > 1 MB."""
154173

155174
large_log_body = "X" * (self.processor._MAX_LOG_REQUEST_BYTE_SIZE + 1)
156-
test_logs = self.generate_test_log_data(
157-
log_body=large_log_body, attr_key="", attr_val="", log_body_depth=-1, attr_depth=-1, count=15
158-
)
175+
test_logs = self.generate_test_log_data(log_body=large_log_body, count=15)
159176

160177
for log in test_logs:
161178
self.processor._queue.appendleft(log)
@@ -178,26 +195,13 @@ def test_export_single_batch_all_logs_over_size_limit(self, _, __, ___):
178195
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.set_value")
179196
def test_export_single_batch_some_logs_over_size_limit(self, _, __, ___):
180197
"""Should make calls to export smaller sub-batch logs"""
181-
large_log_body = "X" * (self.max_log_size + 1)
182-
small_log_body = "X" * (self.max_log_size // 10 - self.base_log_size)
183-
184-
large_logs = self.generate_test_log_data(
185-
log_body=large_log_body,
186-
attr_key="",
187-
attr_val="",
188-
log_body_depth=-1,
189-
attr_depth=-1,
190-
count=3,
198+
large_log_body = "X" * (self.processor._MAX_LOG_REQUEST_BYTE_SIZE + 1)
199+
small_log_body = "X" * (
200+
self.processor._MAX_LOG_REQUEST_BYTE_SIZE // 10 - self.processor._BASE_LOG_BUFFER_BYTE_SIZE
191201
)
192202

193-
small_logs = self.generate_test_log_data(
194-
log_body=small_log_body,
195-
attr_key="",
196-
attr_val="",
197-
log_body_depth=-1,
198-
attr_depth=-1,
199-
count=12,
200-
)
203+
large_logs = self.generate_test_log_data(log_body=large_log_body, count=3)
204+
small_logs = self.generate_test_log_data(log_body=small_log_body, count=12)
201205

202206
# 1st, 2nd, 3rd batch = size 1
203207
# 4th batch = size 10
@@ -230,36 +234,32 @@ def test_export_single_batch_some_logs_over_size_limit(self, _, __, ___):
230234
@staticmethod
231235
def generate_test_log_data(
232236
log_body,
233-
attr_key,
234-
attr_val,
235-
log_body_depth=3,
236-
attr_depth=3,
237+
log_key="key",
238+
log_body_depth=0,
237239
count=5,
238240
create_map=True,
239-
instrumentation_scope=InstrumentationScope("test-scope", "1.0.0"),
240241
) -> List[LogData]:
241242

242243
def generate_nested_value(depth, value, create_map=True) -> AnyValue:
243-
if depth < 0:
244+
if depth <= 0:
244245
return value
245246

246247
if create_map:
247-
return {"t": generate_nested_value(depth - 1, value, True)}
248+
return {log_key: generate_nested_value(depth - 1, value, True)}
248249

249250
return [generate_nested_value(depth - 1, value, False)]
250251

251252
logs = []
252253

253-
for index in range(count):
254+
for _ in range(count):
254255
record = LogRecord(
255256
timestamp=int(time.time_ns()),
256-
trace_id=int(f"0x{index + 1:032x}", 16),
257-
span_id=int(f"0x{index + 1:016x}", 16),
257+
trace_id=0x123456789ABCDEF0123456789ABCDEF0,
258+
span_id=0x123456789ABCDEF0,
258259
trace_flags=TraceFlags(1),
259260
severity_text="INFO",
260261
severity_number=SeverityNumber.INFO,
261262
body=generate_nested_value(log_body_depth, log_body, create_map),
262-
attributes={attr_key: generate_nested_value(attr_depth, attr_val, create_map)},
263263
)
264264

265265
log_data = LogData(log_record=record, instrumentation_scope=InstrumentationScope("test-scope", "1.0.0"))

0 commit comments

Comments
 (0)