Skip to content

Commit c34f3b9

Browse files
committed
add logs pipeline
1 parent 779e89b commit c34f3b9

File tree

6 files changed

+810
-5
lines changed

6 files changed

+810
-5
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import os
55
import re
66
from logging import NOTSET, Logger, getLogger
7-
from typing import ClassVar, Dict, List, Type, Union
7+
from typing import ClassVar, Dict, List, Optional, Type, Union
88

99
from importlib_metadata import version
1010
from typing_extensions import override
@@ -22,6 +22,7 @@
2222
AwsMetricAttributesSpanExporterBuilder,
2323
)
2424
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
25+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import AwsBatchLogRecordProcessor
2526
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
2627
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
2728
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
@@ -181,7 +182,9 @@ def _init_logging(
181182

182183
# Provides a default OTLP log exporter when none is specified.
183184
# This is the behavior for the logs exporters for other languages.
184-
if not exporters:
185+
logs_exporter = os.environ.get("OTEL_LOGS_EXPORTER")
186+
187+
if not exporters and logs_exporter and (logs_exporter.lower() != "none"):
185188
exporters = {"otlp": OTLPLogExporter}
186189

187190
provider = LoggerProvider(resource=resource)
@@ -190,7 +193,11 @@ def _init_logging(
190193
for _, exporter_class in exporters.items():
191194
exporter_args: Dict[str, any] = {}
192195
log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource)
193-
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
196+
197+
if isinstance(log_exporter, OTLPAwsLogExporter) and is_agent_observability_enabled():
198+
provider.add_log_record_processor(AwsBatchLogRecordProcessor(exporter=log_exporter))
199+
else:
200+
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
194201

195202
handler = LoggingHandler(level=NOTSET, logger_provider=provider)
196203

@@ -532,7 +539,7 @@ def _is_lambda_environment():
532539
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ
533540

534541

535-
def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> bool:
542+
def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str] = None, service: str = "xray") -> bool:
536543
"""Is the given endpoint an AWS OTLP endpoint?"""
537544

538545
pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import logging
2+
from typing import Mapping, Optional, Sequence, cast
3+
4+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
5+
from opentelemetry.context import (
6+
_SUPPRESS_INSTRUMENTATION_KEY,
7+
attach,
8+
detach,
9+
set_value,
10+
)
11+
from opentelemetry.sdk._logs import LogData
12+
from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy
13+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
14+
from opentelemetry.util.types import AnyValue
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
19+
class AwsBatchLogRecordProcessor(BatchLogRecordProcessor):
20+
_BASE_LOG_BUFFER_BYTE_SIZE = (
21+
2000 # Buffer size in bytes to account for log metadata not included in the body size calculation
22+
)
23+
_MAX_LOG_REQUEST_BYTE_SIZE = (
24+
1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
25+
)
26+
27+
def __init__(
28+
self,
29+
exporter: OTLPAwsLogExporter,
30+
schedule_delay_millis: Optional[float] = None,
31+
max_export_batch_size: Optional[int] = None,
32+
export_timeout_millis: Optional[float] = None,
33+
max_queue_size: Optional[int] = None,
34+
):
35+
36+
super().__init__(
37+
exporter=exporter,
38+
schedule_delay_millis=schedule_delay_millis,
39+
max_export_batch_size=max_export_batch_size,
40+
export_timeout_millis=export_timeout_millis,
41+
max_queue_size=max_queue_size,
42+
)
43+
44+
self._exporter = exporter
45+
46+
# https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143
47+
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
48+
"""
49+
Preserves existing batching behavior but will intermediarly export small log batches if
50+
the size of the data in the batch is at orabove AWS CloudWatch's maximum request size limit of 1 MB.
51+
52+
- Data size of exported batches will ALWAYS be <= 1 MB except for the case below:
53+
- If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
54+
"""
55+
with self._export_lock:
56+
iteration = 0
57+
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
58+
# once the lock is obtained to see if we still need to make the requested export.
59+
while self._should_export_batch(batch_strategy, iteration):
60+
iteration += 1
61+
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
62+
try:
63+
batch_length = min(self._max_export_batch_size, len(self._queue))
64+
batch_data_size = 0
65+
batch = []
66+
67+
for _ in range(batch_length):
68+
log_data: LogData = self._queue.pop()
69+
log_size = self._BASE_LOG_BUFFER_BYTE_SIZE + self._get_any_value_size(log_data.log_record.body)
70+
71+
if batch and (batch_data_size + log_size > self._MAX_LOG_REQUEST_BYTE_SIZE):
72+
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1
73+
if batch_data_size > self._MAX_LOG_REQUEST_BYTE_SIZE:
74+
if self._is_gen_ai_log(batch[0]):
75+
self._exporter.set_gen_ai_log_flag()
76+
77+
self._exporter.export(batch)
78+
batch_data_size = 0
79+
batch = []
80+
81+
batch_data_size += log_size
82+
batch.append(log_data)
83+
84+
if batch:
85+
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1
86+
if batch_data_size > self._MAX_LOG_REQUEST_BYTE_SIZE:
87+
if self._is_gen_ai_log(batch[0]):
88+
self._exporter.set_gen_ai_log_flag()
89+
90+
self._exporter.export(batch)
91+
except Exception as e: # pylint: disable=broad-exception-caught
92+
_logger.exception("Exception while exporting logs: " + str(e))
93+
detach(token)
94+
95+
def _get_any_value_size(self, val: AnyValue, depth: int = 3) -> int:
96+
"""
97+
Only used to indicate whether we should export a batch log size of 1 or not.
98+
Calculates the size in bytes of an AnyValue object.
99+
Will processs complex AnyValue structures up to the specified depth limit.
100+
If the depth limit of the AnyValue structure is exceeded, returns 0.
101+
102+
Args:
103+
val: The AnyValue object to calculate size for
104+
depth: Maximum depth to traverse in nested structures (default: 3)
105+
106+
Returns:
107+
int: Total size of the AnyValue object in bytes
108+
"""
109+
# Use a stack to prevent excessive recursive calls.
110+
stack = [(val, 0)]
111+
size: int = 0
112+
113+
while stack:
114+
# small optimization. We can stop calculating the size once it reaches the 1 MB limit.
115+
if size >= self._MAX_LOG_REQUEST_BYTE_SIZE:
116+
return size
117+
118+
next_val, current_depth = stack.pop()
119+
120+
if isinstance(next_val, (str, bytes)):
121+
size += len(next_val)
122+
continue
123+
124+
if isinstance(next_val, bool):
125+
size += 4 if next_val else 5
126+
continue
127+
128+
if isinstance(next_val, (float, int)):
129+
size += len(str(next_val))
130+
continue
131+
132+
if current_depth <= depth:
133+
if isinstance(next_val, Sequence):
134+
for content in next_val:
135+
stack.append((cast(AnyValue, content), current_depth + 1))
136+
137+
if isinstance(next_val, Mapping):
138+
for key, content in next_val.items():
139+
size += len(key)
140+
stack.append((content, current_depth + 1))
141+
else:
142+
_logger.debug("Max log depth exceeded. Log data size will not be accurately calculated.")
143+
return 0
144+
145+
return size
146+
147+
@staticmethod
148+
def _is_gen_ai_log(log_data: LogData) -> bool:
149+
"""
150+
Is the log a Gen AI log event?
151+
"""
152+
gen_ai_instrumentations = {
153+
"openinference.instrumentation.langchain",
154+
"openinference.instrumentation.crewai",
155+
"opentelemetry.instrumentation.langchain",
156+
"crewai.telemetry",
157+
"openlit.otel.tracing",
158+
}
159+
160+
return log_data.instrumentation_scope.name in gen_ai_instrumentations

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,41 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from typing import Dict, Optional
4+
import gzip
5+
import logging
6+
from io import BytesIO
7+
from time import sleep
8+
from typing import Dict, Optional, Sequence
9+
10+
import requests
511

612
from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
13+
from opentelemetry.exporter.otlp.proto.common._internal import (
14+
_create_exp_backoff_generator,
15+
)
16+
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
717
from opentelemetry.exporter.otlp.proto.http import Compression
818
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
19+
from opentelemetry.sdk._logs import (
20+
LogData,
21+
)
22+
from opentelemetry.sdk._logs.export import (
23+
LogExportResult,
24+
)
25+
26+
_logger = logging.getLogger(__name__)
927

1028

1129
class OTLPAwsLogExporter(OTLPLogExporter):
30+
_LARGE_LOG_HEADER = "x-aws-truncatable-fields"
31+
_LARGE_GEN_AI_LOG_PATH_HEADER = (
32+
"\\$['resourceLogs'][0]['scopeLogs'][0]['logRecords'][0]['body']"
33+
"['kvlistValue']['values'][*]['value']['kvlistValue']['values'][*]"
34+
"['value']['arrayValue']['values'][*]['kvlistValue']['values'][*]"
35+
"['value']['stringValue']"
36+
)
37+
_RETRY_AFTER_HEADER = "Retry-After" # https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
38+
1239
def __init__(
1340
self,
1441
endpoint: Optional[str] = None,
@@ -18,6 +45,7 @@ def __init__(
1845
headers: Optional[Dict[str, str]] = None,
1946
timeout: Optional[int] = None,
2047
):
48+
self._gen_ai_log_flag = False
2149
self._aws_region = None
2250

2351
if endpoint:
@@ -34,3 +62,134 @@ def __init__(
3462
compression=Compression.Gzip,
3563
session=AwsAuthSession(aws_region=self._aws_region, service="logs"),
3664
)
65+
66+
# https://github.com/open-telemetry/opentelemetry-python/blob/main/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py#L167
67+
def export(self, batch: Sequence[LogData]) -> LogExportResult:
68+
"""
69+
Exports the given batch of OTLP log data.
70+
Behaviors of how this export will work -
71+
72+
1. Always compresses the serialized data into gzip before sending.
73+
74+
2. If self._gen_ai_log_flag is enabled, the log data is > 1 MB a
75+
and the assumption is that the log is a normalized gen.ai LogEvent.
76+
- inject the {LARGE_LOG_HEADER} into the header.
77+
78+
3. Retry behavior is now the following:
79+
- if the response contains a status code that is retryable and the response contains Retry-After in its
80+
headers, the serialized data will be exported after that set delay
81+
82+
- if the response does not contain that Retry-After header, default back to the current iteration of the
83+
exponential backoff delay
84+
"""
85+
86+
if self._shutdown:
87+
_logger.warning("Exporter already shutdown, ignoring batch")
88+
return LogExportResult.FAILURE
89+
90+
serialized_data = encode_logs(batch).SerializeToString()
91+
92+
gzip_data = BytesIO()
93+
with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream:
94+
gzip_stream.write(serialized_data)
95+
96+
data = gzip_data.getvalue()
97+
98+
backoff = _create_exp_backoff_generator(max_value=self._MAX_RETRY_TIMEOUT)
99+
100+
while True:
101+
resp = self._send(data)
102+
103+
if resp.ok:
104+
return LogExportResult.SUCCESS
105+
106+
if not self._retryable(resp):
107+
_logger.error(
108+
"Failed to export logs batch code: %s, reason: %s",
109+
resp.status_code,
110+
resp.text,
111+
)
112+
self._gen_ai_log_flag = False
113+
return LogExportResult.FAILURE
114+
115+
# https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
116+
maybe_retry_after = resp.headers.get(self._RETRY_AFTER_HEADER, None)
117+
118+
# Set the next retry delay to the value of the Retry-After response in the headers.
119+
# If Retry-After is not present in the headers, default to the next iteration of the
120+
# exponential backoff strategy.
121+
122+
delay = self._parse_retryable_header(maybe_retry_after)
123+
124+
if delay == -1:
125+
delay = next(backoff, self._MAX_RETRY_TIMEOUT)
126+
127+
if delay == self._MAX_RETRY_TIMEOUT:
128+
_logger.error(
129+
"Transient error %s encountered while exporting logs batch. "
130+
"No Retry-After header found and all backoff retries exhausted. "
131+
"Logs will not be exported.",
132+
resp.reason,
133+
)
134+
self._gen_ai_log_flag = False
135+
return LogExportResult.FAILURE
136+
137+
_logger.warning(
138+
"Transient error %s encountered while exporting logs batch, retrying in %ss.",
139+
resp.reason,
140+
delay,
141+
)
142+
143+
sleep(delay)
144+
145+
def set_gen_ai_log_flag(self):
146+
"""
147+
Sets a flag that indicates the current log batch contains
148+
a generative AI log record that exceeds the CloudWatch Logs size limit (1MB).
149+
"""
150+
self._gen_ai_log_flag = True
151+
152+
def _send(self, serialized_data: bytes):
153+
try:
154+
response = self._session.post(
155+
url=self._endpoint,
156+
headers={self._LARGE_LOG_HEADER: self._LARGE_GEN_AI_LOG_PATH_HEADER} if self._gen_ai_log_flag else None,
157+
data=serialized_data,
158+
verify=self._certificate_file,
159+
timeout=self._timeout,
160+
cert=self._client_cert,
161+
)
162+
return response
163+
except ConnectionError:
164+
response = self._session.post(
165+
url=self._endpoint,
166+
headers={self._LARGE_LOG_HEADER: self._LARGE_GEN_AI_LOG_PATH_HEADER} if self._gen_ai_log_flag else None,
167+
data=serialized_data,
168+
verify=self._certificate_file,
169+
timeout=self._timeout,
170+
cert=self._client_cert,
171+
)
172+
return response
173+
174+
@staticmethod
175+
def _retryable(resp: requests.Response) -> bool:
176+
"""
177+
Is it a retryable response?
178+
"""
179+
180+
return resp.status_code in (429, 503) or OTLPLogExporter._retryable(resp)
181+
182+
@staticmethod
183+
def _parse_retryable_header(retry_header: Optional[str]) -> float:
184+
"""
185+
Converts the given retryable header into a delay in seconds, returns -1 if there's no header
186+
or error with the parsing
187+
"""
188+
if not retry_header:
189+
return -1
190+
191+
try:
192+
val = float(retry_header)
193+
return val if val >= 0 else -1
194+
except ValueError:
195+
return -1

0 commit comments

Comments
 (0)