Skip to content

Commit 80a9fd3

Browse files
authored
Add AwsBatchLogProcessor and OtlpAwsLogExporter Logs Pipeline (#402)
In order to support CW Logs implementation for handling LLO and adhering to their new limits in ADOT, they specifically indicated that if the log is an LLO then we would export immediately: ``` 1. For each new log record: 1. If (buffer_size + record_size) < 1MB: Add to buffer 2. If (buffer_size + record_size) > 1MB: Ingest current buffer, then add new record to empty buffer 2. If buffer_size > 1MB: Ingest immediately 3. Repeat from step 1 ``` Furthermore they need a new header for the log object that contains all of the paths to the String leaf nodes in the log body represented in `JSON protobuf` format. Below is a representation of the consolidated Gen AI event log schema in JSON protobuf format. *Description of changes:* 1. Adds `AwsBatchLogRecordProcessor` a backwards compatible custom logs BatchProcessor which has the following invariants: - The unserialized, uncompressed data size of exported batches will ALWAYS be <= 1 MB except for the case below: - If the data size of an `exported batch is ever > 1 MB` then the batch size is always length 1 2. Added support for estimating the size of a log record based on its body and attributes. However, because logs are represented as a self-referential data-type, the log record can potentially be very nested and complex. In order to lower the impact on performance when calculating the size of a log, a depth limit is added to how deep the log will be searched. 3. `OTLPAwsLogExporter`: Adds a new behavior for Retry delay based on server-side response of Retry-After header. ~~Injects the LLO header flag if the size of the exported data > 1 MB.~~ 4. Customize the auto instrumentation to use the new `AwsBatchLogRecordProcessor` *Testing* - Unit tests for `AwsBatchLogRecordProcessor` to support the new LLO handling logic - Unit tests for `OtlpAwsLogExporter` to validate the new Retry and LLO header logic - Manual E2E testing to verify that the logic for batching and LLO path header works for Large Logs (mocked the size of the log by setting the log size to 1,000,000 bytes) By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent 21ba7fc commit 80a9fd3

File tree

9 files changed

+956
-29
lines changed

9 files changed

+956
-29
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
33
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
4+
import logging
45
import os
56
import re
6-
from logging import NOTSET, Logger, getLogger
7+
from logging import Logger, getLogger
78
from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union
89

910
from importlib_metadata import version
@@ -22,12 +23,18 @@
2223
AwsMetricAttributesSpanExporterBuilder,
2324
)
2425
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
26+
27+
# pylint: disable=line-too-long
28+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import (
29+
AwsCloudWatchOtlpBatchLogRecordProcessor,
30+
)
2531
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
2632
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
2733
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
2834
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
2935
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
3036
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView
37+
from opentelemetry._events import set_event_logger_provider
3138
from opentelemetry._logs import get_logger_provider, set_logger_provider
3239
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
3340
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
@@ -42,7 +49,9 @@
4249
_import_id_generator,
4350
_import_sampler,
4451
_OTelSDKConfigurator,
52+
_patch_basic_config,
4553
)
54+
from opentelemetry.sdk._events import EventLoggerProvider
4655
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
4756
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter
4857
from opentelemetry.sdk.environment_variables import (
@@ -197,26 +206,28 @@ def _initialize_components():
197206

198207

199208
def _init_logging(
200-
exporters: Dict[str, Type[LogExporter]],
201-
resource: Resource = None,
209+
exporters: dict[str, Type[LogExporter]],
210+
resource: Optional[Resource] = None,
211+
setup_logging_handler: bool = True,
202212
):
203-
204-
# Provides a default OTLP log exporter when none is specified.
205-
# This is the behavior for the logs exporters for other languages.
206-
if not exporters:
207-
exporters = {"otlp": OTLPLogExporter}
208-
209213
provider = LoggerProvider(resource=resource)
210214
set_logger_provider(provider)
211215

212216
for _, exporter_class in exporters.items():
213-
exporter_args: Dict[str, any] = {}
214-
log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource)
215-
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
217+
exporter_args = {}
218+
log_exporter: LogExporter = _customize_logs_exporter(exporter_class(**exporter_args))
219+
log_processor = _customize_log_record_processor(log_exporter)
220+
provider.add_log_record_processor(log_processor)
221+
222+
event_logger_provider = EventLoggerProvider(logger_provider=provider)
223+
set_event_logger_provider(event_logger_provider)
216224

217-
handler = LoggingHandler(level=NOTSET, logger_provider=provider)
225+
if setup_logging_handler:
226+
_patch_basic_config()
218227

219-
getLogger().addHandler(handler)
228+
# Add OTel handler
229+
handler = LoggingHandler(level=logging.NOTSET, logger_provider=provider)
230+
logging.getLogger().addHandler(handler)
220231

221232

222233
def _init_tracing(
@@ -417,7 +428,14 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
417428
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()
418429

419430

420-
def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> LogExporter:
431+
def _customize_log_record_processor(log_exporter: LogExporter):
432+
if isinstance(log_exporter, OTLPAwsLogExporter) and is_agent_observability_enabled():
433+
return AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter)
434+
435+
return BatchLogRecordProcessor(exporter=log_exporter)
436+
437+
438+
def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
421439
logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)
422440

423441
if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
@@ -586,7 +604,7 @@ def _is_lambda_environment():
586604
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ
587605

588606

589-
def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> bool:
607+
def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str] = None, service: str = "xray") -> bool:
590608
"""Is the given endpoint an AWS OTLP endpoint?"""
591609

592610
pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
4+
5+
import logging
6+
from typing import Mapping, Optional, Sequence, cast
7+
8+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
9+
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value
10+
from opentelemetry.sdk._logs import LogData
11+
from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy
12+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
13+
from opentelemetry.util.types import AnyValue
14+
15+
_logger = logging.getLogger(__name__)
16+
17+
18+
class AwsCloudWatchOtlpBatchLogRecordProcessor(BatchLogRecordProcessor):
19+
"""
20+
Custom implementation of BatchLogRecordProcessor that manages log record batching
21+
with size-based constraints to prevent exceeding AWS CloudWatch Logs OTLP endpoint request size limits.
22+
23+
This processor still exports all logs up to _MAX_LOG_REQUEST_BYTE_SIZE but rather than doing exactly
24+
one export, we will estimate log sizes and do multiple batch exports
25+
where each exported batch will have an additional constraint:
26+
27+
If the batch to be exported will have a data size of > 1 MB:
28+
The batch will be split into multiple exports of sub-batches of data size <= 1 MB.
29+
30+
A unique case is if the sub-batch is of data size > 1 MB, then the sub-batch will have exactly 1 log in it.
31+
"""
32+
33+
# OTel log events include fixed metadata attributes so the estimated metadata size
34+
# possibly be calculated as this with best efforts:
35+
# service.name (255 chars) + cloud.resource_id (max ARN length) + telemetry.xxx (~20 chars) +
36+
# common attributes (255 chars) +
37+
# scope + flags + traceId + spanId + numeric/timestamp fields + ...
38+
# Example log structure:
39+
# {
40+
# "resource": {
41+
# "attributes": {
42+
# "aws.local.service": "example-service123",
43+
# "telemetry.sdk.language": "python",
44+
# "service.name": "my-application",
45+
# "cloud.resource_id": "example-resource",
46+
# "aws.log.group.names": "example-log-group",
47+
# "aws.ai.agent.type": "default",
48+
# "telemetry.sdk.version": "1.x.x",
49+
# "telemetry.auto.version": "0.x.x",
50+
# "telemetry.sdk.name": "opentelemetry"
51+
# }
52+
# },
53+
# "scope": {"name": "example.instrumentation.library"},
54+
# "timeUnixNano": 1234567890123456789,
55+
# "observedTimeUnixNano": 1234567890987654321,
56+
# "severityNumber": 9,
57+
# "body": {...},
58+
# "attributes": {...},
59+
# "flags": 1,
60+
# "traceId": "abcd1234efgh5678ijkl9012mnop3456",
61+
# "spanId": "1234abcd5678efgh"
62+
# }
63+
# 2000 might be a bit of an overestimate but it's better to overestimate the size of the log
64+
# and suffer a small performance impact with batching than it is to underestimate and risk
65+
# a large log being dropped when sent to the AWS otlp endpoint.
66+
_BASE_LOG_BUFFER_BYTE_SIZE = 2000
67+
68+
_MAX_LOG_REQUEST_BYTE_SIZE = (
69+
1048576 # Maximum uncompressed/unserialized bytes / request -
70+
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
71+
)
72+
73+
def __init__(
74+
self,
75+
exporter: OTLPAwsLogExporter,
76+
schedule_delay_millis: Optional[float] = None,
77+
max_export_batch_size: Optional[int] = None,
78+
export_timeout_millis: Optional[float] = None,
79+
max_queue_size: Optional[int] = None,
80+
):
81+
82+
super().__init__(
83+
exporter=exporter,
84+
schedule_delay_millis=schedule_delay_millis,
85+
max_export_batch_size=max_export_batch_size,
86+
export_timeout_millis=export_timeout_millis,
87+
max_queue_size=max_queue_size,
88+
)
89+
90+
self._exporter = exporter
91+
92+
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
93+
"""
94+
Explicitly overrides upstream _export method to add AWS CloudWatch size-based batching
95+
See:
96+
https://github.com/open-telemetry/opentelemetry-python/blob/bb21ebd46d070c359eee286c97bdf53bfd06759d/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143
97+
98+
Preserves existing batching behavior but will intermediarly export small log batches if
99+
the size of the data in the batch is estimated to be at or above AWS CloudWatch's
100+
maximum request size limit of 1 MB.
101+
102+
- Estimated data size of exported batches will typically be <= 1 MB except for the case below:
103+
- If the estimated data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
104+
"""
105+
with self._export_lock:
106+
iteration = 0
107+
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
108+
# once the lock is obtained to see if we still need to make the requested export.
109+
while self._should_export_batch(batch_strategy, iteration):
110+
iteration += 1
111+
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
112+
try:
113+
batch_length = min(self._max_export_batch_size, len(self._queue))
114+
batch_data_size = 0
115+
batch = []
116+
117+
for _ in range(batch_length):
118+
log_data: LogData = self._queue.pop()
119+
log_size = self._estimate_log_size(log_data)
120+
121+
if batch and (batch_data_size + log_size > self._MAX_LOG_REQUEST_BYTE_SIZE):
122+
self._exporter.export(batch)
123+
batch_data_size = 0
124+
batch = []
125+
126+
batch_data_size += log_size
127+
batch.append(log_data)
128+
129+
if batch:
130+
self._exporter.export(batch)
131+
except Exception as exception: # pylint: disable=broad-exception-caught
132+
_logger.exception("Exception while exporting logs: %s", exception)
133+
detach(token)
134+
135+
def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: disable=too-many-branches
136+
"""
137+
Estimates the size in bytes of a log by calculating the size of its body and its attributes
138+
and adding a buffer amount to account for other log metadata information.
139+
140+
Features:
141+
- Processes complex log structures up to the specified depth limit
142+
- Includes cycle detection to prevent processing the same content more than once
143+
- Returns truncated calculation if depth limit is exceeded
144+
145+
We set depth to 3 as this is the minimum required depth to estimate our consolidated Gen AI log events:
146+
147+
Example structure:
148+
{
149+
"output": {
150+
"messages": [
151+
{
152+
"content": "Hello, World!",
153+
"role": "assistant"
154+
}
155+
]
156+
},
157+
"input": {
158+
"messages": [
159+
{
160+
"content": "Say Hello, World!",
161+
"role": "user"
162+
}
163+
]
164+
}
165+
}
166+
167+
Args:
168+
log: The Log object to calculate size for
169+
depth: Maximum depth to traverse in nested structures (default: 3)
170+
171+
Returns:
172+
int: The estimated size of the log object in bytes
173+
"""
174+
175+
# Queue contains tuples of (log_content, depth) where:
176+
# - log_content is the current piece of log data being processed
177+
# - depth tracks how many levels deep we've traversed to reach this content
178+
# - body starts at depth 0 since it's an AnyValue object
179+
# - Attributes start at depth -1 since it's a Mapping[str, AnyValue] - when traversed, we will
180+
# start processing its keys at depth 0
181+
queue = [(log.log_record.body, 0), (log.log_record.attributes, -1)]
182+
183+
# Track visited complex log contents to avoid calculating the same one more than once
184+
visited = set()
185+
186+
size: int = self._BASE_LOG_BUFFER_BYTE_SIZE
187+
188+
while queue:
189+
new_queue = []
190+
191+
for data in queue:
192+
# small optimization, can stop calculating the size once it reaches the 1 MB limit.
193+
if size >= self._MAX_LOG_REQUEST_BYTE_SIZE:
194+
return size
195+
196+
next_val, current_depth = data
197+
198+
if next_val is None:
199+
continue
200+
201+
if isinstance(next_val, bytes):
202+
size += len(next_val)
203+
continue
204+
205+
if isinstance(next_val, (str, float, int, bool)):
206+
size += AwsCloudWatchOtlpBatchLogRecordProcessor._estimate_utf8_size(str(next_val))
207+
continue
208+
209+
# next_val must be Sequence["AnyValue"] or Mapping[str, "AnyValue"]
210+
# See: https://github.com/open-telemetry/opentelemetry-python/blob/\
211+
# 9426d6da834cfb4df7daedd4426bba0aa83165b5/opentelemetry-api/src/opentelemetry/util/types.py#L20
212+
if current_depth <= depth:
213+
obj_id = id(
214+
next_val
215+
) # Guaranteed to be unique, see: https://www.w3schools.com/python/ref_func_id.asp
216+
if obj_id in visited:
217+
continue
218+
visited.add(obj_id)
219+
220+
if isinstance(next_val, Sequence):
221+
for content in next_val:
222+
new_queue.append((cast(AnyValue, content), current_depth + 1))
223+
224+
if isinstance(next_val, Mapping):
225+
for key, content in next_val.items():
226+
size += len(key)
227+
new_queue.append((content, current_depth + 1))
228+
else:
229+
_logger.debug(
230+
"Max log depth of %s exceeded. Log data size will not be accurately calculated.", depth
231+
)
232+
233+
queue = new_queue
234+
235+
return size
236+
237+
@staticmethod
238+
def _estimate_utf8_size(s: str):
239+
ascii_count = 0
240+
non_ascii_count = 0
241+
242+
for char in s:
243+
if ord(char) < 128:
244+
ascii_count += 1
245+
else:
246+
non_ascii_count += 1
247+
248+
# Estimate: ASCII chars (1 byte) + upper bound of non-ASCII chars 4 bytes
249+
return ascii_count + (non_ascii_count * 4)

0 commit comments

Comments
 (0)