Skip to content

Commit 3573212

Browse files
authored
Merge branch 'main' into dependabot/pip/contract-tests/images/mock-collector/protobuf-5.29.5
2 parents ad591ae + 80a9fd3 commit 3573212

File tree

9 files changed

+956
-29
lines changed

9 files changed

+956
-29
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
33
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
4+
import logging
45
import os
56
import re
6-
from logging import NOTSET, Logger, getLogger
7+
from logging import Logger, getLogger
78
from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union
89

910
from importlib_metadata import version
@@ -22,12 +23,18 @@
2223
AwsMetricAttributesSpanExporterBuilder,
2324
)
2425
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
26+
27+
# pylint: disable=line-too-long
28+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import (
29+
AwsCloudWatchOtlpBatchLogRecordProcessor,
30+
)
2531
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
2632
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
2733
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
2834
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
2935
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
3036
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView
37+
from opentelemetry._events import set_event_logger_provider
3138
from opentelemetry._logs import get_logger_provider, set_logger_provider
3239
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
3340
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
@@ -42,7 +49,9 @@
4249
_import_id_generator,
4350
_import_sampler,
4451
_OTelSDKConfigurator,
52+
_patch_basic_config,
4553
)
54+
from opentelemetry.sdk._events import EventLoggerProvider
4655
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
4756
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter
4857
from opentelemetry.sdk.environment_variables import (
@@ -197,26 +206,28 @@ def _initialize_components():
197206

198207

199208
def _init_logging(
200-
exporters: Dict[str, Type[LogExporter]],
201-
resource: Resource = None,
209+
exporters: dict[str, Type[LogExporter]],
210+
resource: Optional[Resource] = None,
211+
setup_logging_handler: bool = True,
202212
):
203-
204-
# Provides a default OTLP log exporter when none is specified.
205-
# This is the behavior for the logs exporters for other languages.
206-
if not exporters:
207-
exporters = {"otlp": OTLPLogExporter}
208-
209213
provider = LoggerProvider(resource=resource)
210214
set_logger_provider(provider)
211215

212216
for _, exporter_class in exporters.items():
213-
exporter_args: Dict[str, any] = {}
214-
log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource)
215-
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
217+
exporter_args = {}
218+
log_exporter: LogExporter = _customize_logs_exporter(exporter_class(**exporter_args))
219+
log_processor = _customize_log_record_processor(log_exporter)
220+
provider.add_log_record_processor(log_processor)
221+
222+
event_logger_provider = EventLoggerProvider(logger_provider=provider)
223+
set_event_logger_provider(event_logger_provider)
216224

217-
handler = LoggingHandler(level=NOTSET, logger_provider=provider)
225+
if setup_logging_handler:
226+
_patch_basic_config()
218227

219-
getLogger().addHandler(handler)
228+
# Add OTel handler
229+
handler = LoggingHandler(level=logging.NOTSET, logger_provider=provider)
230+
logging.getLogger().addHandler(handler)
220231

221232

222233
def _init_tracing(
@@ -417,7 +428,14 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
417428
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()
418429

419430

420-
def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> LogExporter:
431+
def _customize_log_record_processor(log_exporter: LogExporter):
432+
if isinstance(log_exporter, OTLPAwsLogExporter) and is_agent_observability_enabled():
433+
return AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter)
434+
435+
return BatchLogRecordProcessor(exporter=log_exporter)
436+
437+
438+
def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
421439
logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)
422440

423441
if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
@@ -586,7 +604,7 @@ def _is_lambda_environment():
586604
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ
587605

588606

589-
def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> bool:
607+
def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str] = None, service: str = "xray") -> bool:
590608
"""Is the given endpoint an AWS OTLP endpoint?"""
591609

592610
pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
4+
5+
import logging
6+
from typing import Mapping, Optional, Sequence, cast
7+
8+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
9+
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value
10+
from opentelemetry.sdk._logs import LogData
11+
from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy
12+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
13+
from opentelemetry.util.types import AnyValue
14+
15+
_logger = logging.getLogger(__name__)
16+
17+
18+
class AwsCloudWatchOtlpBatchLogRecordProcessor(BatchLogRecordProcessor):
19+
"""
20+
Custom implementation of BatchLogRecordProcessor that manages log record batching
21+
with size-based constraints to prevent exceeding AWS CloudWatch Logs OTLP endpoint request size limits.
22+
23+
This processor still exports all logs up to _MAX_LOG_REQUEST_BYTE_SIZE but rather than doing exactly
24+
one export, we will estimate log sizes and do multiple batch exports
25+
where each exported batch will have an additional constraint:
26+
27+
If the batch to be exported will have a data size of > 1 MB:
28+
The batch will be split into multiple exports of sub-batches of data size <= 1 MB.
29+
30+
A unique case is if the sub-batch is of data size > 1 MB, then the sub-batch will have exactly 1 log in it.
31+
"""
32+
33+
# OTel log events include fixed metadata attributes so the estimated metadata size
34+
# possibly be calculated as this with best efforts:
35+
# service.name (255 chars) + cloud.resource_id (max ARN length) + telemetry.xxx (~20 chars) +
36+
# common attributes (255 chars) +
37+
# scope + flags + traceId + spanId + numeric/timestamp fields + ...
38+
# Example log structure:
39+
# {
40+
# "resource": {
41+
# "attributes": {
42+
# "aws.local.service": "example-service123",
43+
# "telemetry.sdk.language": "python",
44+
# "service.name": "my-application",
45+
# "cloud.resource_id": "example-resource",
46+
# "aws.log.group.names": "example-log-group",
47+
# "aws.ai.agent.type": "default",
48+
# "telemetry.sdk.version": "1.x.x",
49+
# "telemetry.auto.version": "0.x.x",
50+
# "telemetry.sdk.name": "opentelemetry"
51+
# }
52+
# },
53+
# "scope": {"name": "example.instrumentation.library"},
54+
# "timeUnixNano": 1234567890123456789,
55+
# "observedTimeUnixNano": 1234567890987654321,
56+
# "severityNumber": 9,
57+
# "body": {...},
58+
# "attributes": {...},
59+
# "flags": 1,
60+
# "traceId": "abcd1234efgh5678ijkl9012mnop3456",
61+
# "spanId": "1234abcd5678efgh"
62+
# }
63+
# 2000 might be a bit of an overestimate but it's better to overestimate the size of the log
64+
# and suffer a small performance impact with batching than it is to underestimate and risk
65+
# a large log being dropped when sent to the AWS otlp endpoint.
66+
_BASE_LOG_BUFFER_BYTE_SIZE = 2000
67+
68+
_MAX_LOG_REQUEST_BYTE_SIZE = (
69+
1048576 # Maximum uncompressed/unserialized bytes / request -
70+
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
71+
)
72+
73+
def __init__(
74+
self,
75+
exporter: OTLPAwsLogExporter,
76+
schedule_delay_millis: Optional[float] = None,
77+
max_export_batch_size: Optional[int] = None,
78+
export_timeout_millis: Optional[float] = None,
79+
max_queue_size: Optional[int] = None,
80+
):
81+
82+
super().__init__(
83+
exporter=exporter,
84+
schedule_delay_millis=schedule_delay_millis,
85+
max_export_batch_size=max_export_batch_size,
86+
export_timeout_millis=export_timeout_millis,
87+
max_queue_size=max_queue_size,
88+
)
89+
90+
self._exporter = exporter
91+
92+
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
93+
"""
94+
Explicitly overrides upstream _export method to add AWS CloudWatch size-based batching
95+
See:
96+
https://github.com/open-telemetry/opentelemetry-python/blob/bb21ebd46d070c359eee286c97bdf53bfd06759d/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143
97+
98+
Preserves existing batching behavior but will intermediarly export small log batches if
99+
the size of the data in the batch is estimated to be at or above AWS CloudWatch's
100+
maximum request size limit of 1 MB.
101+
102+
- Estimated data size of exported batches will typically be <= 1 MB except for the case below:
103+
- If the estimated data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
104+
"""
105+
with self._export_lock:
106+
iteration = 0
107+
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
108+
# once the lock is obtained to see if we still need to make the requested export.
109+
while self._should_export_batch(batch_strategy, iteration):
110+
iteration += 1
111+
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
112+
try:
113+
batch_length = min(self._max_export_batch_size, len(self._queue))
114+
batch_data_size = 0
115+
batch = []
116+
117+
for _ in range(batch_length):
118+
log_data: LogData = self._queue.pop()
119+
log_size = self._estimate_log_size(log_data)
120+
121+
if batch and (batch_data_size + log_size > self._MAX_LOG_REQUEST_BYTE_SIZE):
122+
self._exporter.export(batch)
123+
batch_data_size = 0
124+
batch = []
125+
126+
batch_data_size += log_size
127+
batch.append(log_data)
128+
129+
if batch:
130+
self._exporter.export(batch)
131+
except Exception as exception: # pylint: disable=broad-exception-caught
132+
_logger.exception("Exception while exporting logs: %s", exception)
133+
detach(token)
134+
135+
def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: disable=too-many-branches
136+
"""
137+
Estimates the size in bytes of a log by calculating the size of its body and its attributes
138+
and adding a buffer amount to account for other log metadata information.
139+
140+
Features:
141+
- Processes complex log structures up to the specified depth limit
142+
- Includes cycle detection to prevent processing the same content more than once
143+
- Returns truncated calculation if depth limit is exceeded
144+
145+
We set depth to 3 as this is the minimum required depth to estimate our consolidated Gen AI log events:
146+
147+
Example structure:
148+
{
149+
"output": {
150+
"messages": [
151+
{
152+
"content": "Hello, World!",
153+
"role": "assistant"
154+
}
155+
]
156+
},
157+
"input": {
158+
"messages": [
159+
{
160+
"content": "Say Hello, World!",
161+
"role": "user"
162+
}
163+
]
164+
}
165+
}
166+
167+
Args:
168+
log: The Log object to calculate size for
169+
depth: Maximum depth to traverse in nested structures (default: 3)
170+
171+
Returns:
172+
int: The estimated size of the log object in bytes
173+
"""
174+
175+
# Queue contains tuples of (log_content, depth) where:
176+
# - log_content is the current piece of log data being processed
177+
# - depth tracks how many levels deep we've traversed to reach this content
178+
# - body starts at depth 0 since it's an AnyValue object
179+
# - Attributes start at depth -1 since it's a Mapping[str, AnyValue] - when traversed, we will
180+
# start processing its keys at depth 0
181+
queue = [(log.log_record.body, 0), (log.log_record.attributes, -1)]
182+
183+
# Track visited complex log contents to avoid calculating the same one more than once
184+
visited = set()
185+
186+
size: int = self._BASE_LOG_BUFFER_BYTE_SIZE
187+
188+
while queue:
189+
new_queue = []
190+
191+
for data in queue:
192+
# small optimization, can stop calculating the size once it reaches the 1 MB limit.
193+
if size >= self._MAX_LOG_REQUEST_BYTE_SIZE:
194+
return size
195+
196+
next_val, current_depth = data
197+
198+
if next_val is None:
199+
continue
200+
201+
if isinstance(next_val, bytes):
202+
size += len(next_val)
203+
continue
204+
205+
if isinstance(next_val, (str, float, int, bool)):
206+
size += AwsCloudWatchOtlpBatchLogRecordProcessor._estimate_utf8_size(str(next_val))
207+
continue
208+
209+
# next_val must be Sequence["AnyValue"] or Mapping[str, "AnyValue"]
210+
# See: https://github.com/open-telemetry/opentelemetry-python/blob/\
211+
# 9426d6da834cfb4df7daedd4426bba0aa83165b5/opentelemetry-api/src/opentelemetry/util/types.py#L20
212+
if current_depth <= depth:
213+
obj_id = id(
214+
next_val
215+
) # Guaranteed to be unique, see: https://www.w3schools.com/python/ref_func_id.asp
216+
if obj_id in visited:
217+
continue
218+
visited.add(obj_id)
219+
220+
if isinstance(next_val, Sequence):
221+
for content in next_val:
222+
new_queue.append((cast(AnyValue, content), current_depth + 1))
223+
224+
if isinstance(next_val, Mapping):
225+
for key, content in next_val.items():
226+
size += len(key)
227+
new_queue.append((content, current_depth + 1))
228+
else:
229+
_logger.debug(
230+
"Max log depth of %s exceeded. Log data size will not be accurately calculated.", depth
231+
)
232+
233+
queue = new_queue
234+
235+
return size
236+
237+
@staticmethod
238+
def _estimate_utf8_size(s: str):
239+
ascii_count = 0
240+
non_ascii_count = 0
241+
242+
for char in s:
243+
if ord(char) < 128:
244+
ascii_count += 1
245+
else:
246+
non_ascii_count += 1
247+
248+
# Estimate: ASCII chars (1 byte) + upper bound of non-ASCII chars 4 bytes
249+
return ascii_count + (non_ascii_count * 4)

0 commit comments

Comments
 (0)