-
Notifications
You must be signed in to change notification settings - Fork 25
Add AwsBatchLogProcessor and OtlpAwsLogExporter Logs Pipeline #402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 9 commits
Commits
Show all changes
39 commits
Select commit
Hold shift + click to select a range
c34f3b9
add logs pipeline
liustve 010e7df
add logs pipeline
liustve 24f4308
Merge remote-tracking branch 'origin/mainline' into logs-mainline
liustve b75fe99
linting fix
liustve d588605
linting fix
liustve c78aca5
linting fix
liustve 12eca32
linting fix
liustve 83ec370
linting fix
liustve 79bbf46
linting fix
liustve b6e1b97
remove gen ai handling logic
liustve 17d0f90
fixed linting
liustve 3d12858
refactor _init_logging to 1.33.1 version
liustve 7f90bc7
refactored batch log record processor
liustve fdddb7a
Merge remote-tracking branch 'upstream/main' into logs-mainline
liustve 4b7bb0e
linting
liustve 8c64adb
lint fix
liustve 01e3fd8
update configuration and tests
liustve 2f0268c
lint fix
liustve 7dbcb7e
linting fix
liustve 886b009
Merge remote-tracking branch 'upstream/main' into logs-mainline
liustve 93e2836
add cycle detection
liustve cb21d39
add comment about termination of loop
liustve 651f283
Merge branch 'main' into logs-mainline
liustve ff2fb5d
refactored otlp aws log exporter, add comments aws batch log processor
liustve 153679a
Merge branch 'main' into logs-mainline
liustve bce91dc
linting fix
liustve 8479ac9
Merge branch 'logs-mainline' of https://github.com/liustve/aws-otel-p…
liustve 76e4b47
remove shut down check before sleep
liustve 6dd6a67
add better estimation for non-ascii characters
liustve 502eb01
linting + formatting fix
liustve b30ad4f
fix unit test
liustve 8b7e671
linting fix
liustve dc98cf8
add interruptible shutdown
liustve 3450a11
fix sleep unit tests + renaming aws batch log processor
liustve 7a83e92
linting fix
liustve a38d43d
fix test
liustve 726a9a8
linting fix
liustve fc77123
linting fix
liustve f571ffb
linting fix
liustve File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
170 changes: 170 additions & 0 deletions
170
.../src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import logging | ||
from typing import List, Mapping, Optional, Sequence, cast | ||
|
||
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter | ||
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value | ||
from opentelemetry.sdk._logs import LogData | ||
from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy | ||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor | ||
from opentelemetry.util.types import AnyValue | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
class AwsBatchLogRecordProcessor(BatchLogRecordProcessor): | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
_BASE_LOG_BUFFER_BYTE_SIZE = ( | ||
1000 # Buffer size in bytes to account for log metadata not included in the body or attribute size calculation | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
_MAX_LOG_REQUEST_BYTE_SIZE = ( | ||
1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html | ||
) | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def __init__( | ||
self, | ||
exporter: OTLPAwsLogExporter, | ||
schedule_delay_millis: Optional[float] = None, | ||
max_export_batch_size: Optional[int] = None, | ||
export_timeout_millis: Optional[float] = None, | ||
max_queue_size: Optional[int] = None, | ||
): | ||
|
||
super().__init__( | ||
exporter=exporter, | ||
schedule_delay_millis=schedule_delay_millis, | ||
max_export_batch_size=max_export_batch_size, | ||
export_timeout_millis=export_timeout_millis, | ||
max_queue_size=max_queue_size, | ||
) | ||
|
||
self._exporter = exporter | ||
|
||
# https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143 | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def _export(self, batch_strategy: BatchLogExportStrategy) -> None: | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Preserves existing batching behavior but will intermediarly export small log batches if | ||
the size of the data in the batch is at orabove AWS CloudWatch's maximum request size limit of 1 MB. | ||
|
||
- Data size of exported batches will ALWAYS be <= 1 MB except for the case below: | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
- If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1 | ||
""" | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
with self._export_lock: | ||
iteration = 0 | ||
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch | ||
# once the lock is obtained to see if we still need to make the requested export. | ||
while self._should_export_batch(batch_strategy, iteration): | ||
iteration += 1 | ||
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) | ||
try: | ||
batch_length = min(self._max_export_batch_size, len(self._queue)) | ||
batch_data_size = 0 | ||
batch = [] | ||
|
||
for _ in range(batch_length): | ||
log_data: LogData = self._queue.pop() | ||
log_size = self._estimate_log_size(log_data) | ||
|
||
if batch and ( | ||
batch_data_size + log_size > self._MAX_LOG_REQUEST_BYTE_SIZE | ||
): # pylint: disable=too-many-nested-blocks | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1 | ||
if batch_data_size > self._MAX_LOG_REQUEST_BYTE_SIZE: | ||
if self._is_gen_ai_log(batch[0]): | ||
self._exporter.set_gen_ai_log_flag() | ||
|
||
self._exporter.export(batch) | ||
batch_data_size = 0 | ||
batch = [] | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
batch_data_size += log_size | ||
batch.append(log_data) | ||
|
||
if batch: | ||
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1 | ||
if batch_data_size > self._MAX_LOG_REQUEST_BYTE_SIZE: | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if self._is_gen_ai_log(batch[0]): | ||
self._exporter.set_gen_ai_log_flag() | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
self._exporter.export(batch) | ||
except Exception as exception: # pylint: disable=broad-exception-caught | ||
_logger.exception("Exception while exporting logs: %s", exception) | ||
detach(token) | ||
|
||
def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Estimates the size in bytes of a log by calculating the size of its body and its attributes | ||
and adding a buffer amount to account for other log metadata information. | ||
Will process complex log structures up to the specified depth limit. | ||
If the depth limit of the log structure is exceeded, returns truncates calculation | ||
to everything up to that point. | ||
|
||
Args: | ||
log: The Log object to calculate size for | ||
depth: Maximum depth to traverse in nested structures (default: 3) | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Returns: | ||
int: The estimated size of the log object in bytes | ||
""" | ||
|
||
# Use a queue to prevent excessive recursive calls. | ||
# We calculate based on the size of the log record body and attributes for the log. | ||
queue: List[tuple[AnyValue, int]] = [(log.log_record.body, 0), (log.log_record.attributes, -1)] | ||
|
||
size: int = self._BASE_LOG_BUFFER_BYTE_SIZE | ||
|
||
while queue: | ||
new_queue: List[tuple[AnyValue, int]] = [] | ||
|
||
for data in queue: | ||
# small optimization, can stop calculating the size once it reaches the 1 MB limit. | ||
if size >= self._MAX_LOG_REQUEST_BYTE_SIZE: | ||
return size | ||
|
||
next_val, current_depth = data | ||
|
||
if isinstance(next_val, (str, bytes)): | ||
size += len(next_val) | ||
continue | ||
|
||
if isinstance(next_val, bool): | ||
size += 4 if next_val else 5 | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
continue | ||
|
||
if isinstance(next_val, (float, int)): | ||
size += len(str(next_val)) | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
continue | ||
|
||
if current_depth <= depth: | ||
if isinstance(next_val, Sequence): | ||
for content in next_val: | ||
new_queue.append((cast(AnyValue, content), current_depth + 1)) | ||
|
||
if isinstance(next_val, Mapping): | ||
for key, content in next_val.items(): | ||
size += len(key) | ||
new_queue.append((content, current_depth + 1)) | ||
else: | ||
_logger.debug( | ||
"Max log depth of %s exceeded. Log data size will not be accurately calculated.", depth | ||
) | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
queue = new_queue | ||
|
||
return size | ||
|
||
@staticmethod | ||
def _is_gen_ai_log(log: LogData) -> bool: | ||
""" | ||
Is the log a Gen AI log event? | ||
""" | ||
gen_ai_instrumentations = { | ||
"openinference.instrumentation.langchain", | ||
"openinference.instrumentation.crewai", | ||
"opentelemetry.instrumentation.langchain", | ||
"crewai.telemetry", | ||
"openlit.otel.tracing", | ||
} | ||
|
||
return log.instrumentation_scope.name in gen_ai_instrumentations | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.