-
Notifications
You must be signed in to change notification settings - Fork 25
Add AwsBatchLogProcessor and OtlpAwsLogExporter Logs Pipeline #402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 22 commits
Commits
Show all changes
39 commits
Select commit
Hold shift + click to select a range
c34f3b9
add logs pipeline
liustve 010e7df
add logs pipeline
liustve 24f4308
Merge remote-tracking branch 'origin/mainline' into logs-mainline
liustve b75fe99
linting fix
liustve d588605
linting fix
liustve c78aca5
linting fix
liustve 12eca32
linting fix
liustve 83ec370
linting fix
liustve 79bbf46
linting fix
liustve b6e1b97
remove gen ai handling logic
liustve 17d0f90
fixed linting
liustve 3d12858
refactor _init_logging to 1.33.1 version
liustve 7f90bc7
refactored batch log record processor
liustve fdddb7a
Merge remote-tracking branch 'upstream/main' into logs-mainline
liustve 4b7bb0e
linting
liustve 8c64adb
lint fix
liustve 01e3fd8
update configuration and tests
liustve 2f0268c
lint fix
liustve 7dbcb7e
linting fix
liustve 886b009
Merge remote-tracking branch 'upstream/main' into logs-mainline
liustve 93e2836
add cycle detection
liustve cb21d39
add comment about termination of loop
liustve 651f283
Merge branch 'main' into logs-mainline
liustve ff2fb5d
refactored otlp aws log exporter, add comments aws batch log processor
liustve 153679a
Merge branch 'main' into logs-mainline
liustve bce91dc
linting fix
liustve 8479ac9
Merge branch 'logs-mainline' of https://github.com/liustve/aws-otel-p…
liustve 76e4b47
remove shut down check before sleep
liustve 6dd6a67
add better estimation for non-ascii characters
liustve 502eb01
linting + formatting fix
liustve b30ad4f
fix unit test
liustve 8b7e671
linting fix
liustve dc98cf8
add interruptible shutdown
liustve 3450a11
fix sleep unit tests + renaming aws batch log processor
liustve 7a83e92
linting fix
liustve a38d43d
fix test
liustve 726a9a8
linting fix
liustve fc77123
linting fix
liustve f571ffb
linting fix
liustve File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
180 changes: 180 additions & 0 deletions
180
.../src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import logging | ||
from typing import Mapping, Optional, Sequence, cast | ||
|
||
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter | ||
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value | ||
from opentelemetry.sdk._logs import LogData | ||
from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy | ||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor | ||
from opentelemetry.util.types import AnyValue | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
class AwsCloudWatchOtlpBatchLogRecordProcessor(BatchLogRecordProcessor): | ||
""" | ||
Custom implementation of BatchLogRecordProcessor that manages log record batching | ||
with size-based constraints to prevent exceeding AWS CloudWatch Logs OTLP endpoint request size limits. | ||
|
||
This processor still exports all logs up to _max_export_batch_size but rather than doing exactly | ||
one export, we will estimate log sizes and do multiple batch exports | ||
where each exported batch will have an additional constraint: | ||
|
||
If the batch to be exported will have a data size of > 1 MB: | ||
The batch will be split into multiple exports of sub-batches of data size <= 1 MB. | ||
|
||
A unique case is if the sub-batch is of data size > 1 MB, then the sub-batch will have exactly 1 log in it. | ||
""" | ||
|
||
_BASE_LOG_BUFFER_BYTE_SIZE = ( | ||
1000 # Buffer size in bytes to account for log metadata not included in the body or attribute size calculation | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
_MAX_LOG_REQUEST_BYTE_SIZE = ( | ||
1048576 # Maximum uncompressed/unserialized bytes / request - | ||
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html | ||
) | ||
|
||
def __init__( | ||
self, | ||
exporter: OTLPAwsLogExporter, | ||
schedule_delay_millis: Optional[float] = None, | ||
max_export_batch_size: Optional[int] = None, | ||
export_timeout_millis: Optional[float] = None, | ||
max_queue_size: Optional[int] = None, | ||
): | ||
|
||
super().__init__( | ||
exporter=exporter, | ||
schedule_delay_millis=schedule_delay_millis, | ||
max_export_batch_size=max_export_batch_size, | ||
export_timeout_millis=export_timeout_millis, | ||
max_queue_size=max_queue_size, | ||
) | ||
|
||
self._exporter = exporter | ||
|
||
def _export(self, batch_strategy: BatchLogExportStrategy) -> None: | ||
""" | ||
Explicitly overrides upstream _export method to add AWS CloudWatch size-based batching | ||
See: | ||
https://github.com/open-telemetry/opentelemetry-python/blob/bb21ebd46d070c359eee286c97bdf53bfd06759d/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143 | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Preserves existing batching behavior but will intermediarly export small log batches if | ||
the size of the data in the batch is at or above AWS CloudWatch's maximum request size limit of 1 MB. | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
- Data size of exported batches will ALWAYS be <= 1 MB except for the case below: | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
- If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1 | ||
""" | ||
with self._export_lock: | ||
iteration = 0 | ||
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch | ||
# once the lock is obtained to see if we still need to make the requested export. | ||
while self._should_export_batch(batch_strategy, iteration): | ||
iteration += 1 | ||
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) | ||
try: | ||
batch_length = min(self._max_export_batch_size, len(self._queue)) | ||
batch_data_size = 0 | ||
batch = [] | ||
|
||
for _ in range(batch_length): | ||
log_data: LogData = self._queue.pop() | ||
log_size = self._estimate_log_size(log_data) | ||
|
||
if batch and (batch_data_size + log_size > self._MAX_LOG_REQUEST_BYTE_SIZE): | ||
self._exporter.export(batch) | ||
batch_data_size = 0 | ||
batch = [] | ||
|
||
batch_data_size += log_size | ||
batch.append(log_data) | ||
|
||
if batch: | ||
self._exporter.export(batch) | ||
except Exception as exception: # pylint: disable=broad-exception-caught | ||
_logger.exception("Exception while exporting logs: %s", exception) | ||
detach(token) | ||
|
||
def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: disable=too-many-branches | ||
""" | ||
Estimates the size in bytes of a log by calculating the size of its body and its attributes | ||
and adding a buffer amount to account for other log metadata information. | ||
Will process complex log structures up to the specified depth limit. | ||
Includes cycle detection to prevent processing the log content more than once. | ||
If the depth limit of the log structure is exceeded, returns the truncated calculation | ||
to everything up to that point. | ||
|
||
Args: | ||
log: The Log object to calculate size for | ||
depth: Maximum depth to traverse in nested structures (default: 3) | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Returns: | ||
int: The estimated size of the log object in bytes | ||
""" | ||
|
||
# Queue contains tuples of (log_content, depth) where: | ||
# - log_content is the current piece of log data being processed | ||
# - depth tracks how many levels deep we've traversed to reach this content | ||
# - body starts at depth 0 since it's an AnyValue object | ||
# - Attributes start at depth -1 since it's a Mapping[str, AnyValue] - when traversed, we will | ||
# start processing its keys at depth 0 | ||
queue = [(log.log_record.body, 0), (log.log_record.attributes, -1)] | ||
|
||
# Track visited complex log contents to avoid calculating the same one more than once | ||
visited = set() | ||
|
||
size: int = self._BASE_LOG_BUFFER_BYTE_SIZE | ||
|
||
while queue: | ||
new_queue = [] | ||
|
||
for data in queue: | ||
# small optimization, can stop calculating the size once it reaches the 1 MB limit. | ||
if size >= self._MAX_LOG_REQUEST_BYTE_SIZE: | ||
return size | ||
|
||
next_val, current_depth = data | ||
|
||
if next_val is None: | ||
continue | ||
|
||
if isinstance(next_val, bool): | ||
size += 4 if next_val else 5 | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
continue | ||
|
||
if isinstance(next_val, (str, bytes)): | ||
size += len(next_val) | ||
continue | ||
|
||
if isinstance(next_val, (float, int)): | ||
size += len(str(next_val)) | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
continue | ||
|
||
# next_val must be Sequence["AnyValue"] or Mapping[str, "AnyValue"], | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if current_depth <= depth: | ||
obj_id = id( | ||
next_val | ||
) # Guaranteed to be unique, see: https://www.w3schools.com/python/ref_func_id.asp | ||
thpierce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if obj_id in visited: | ||
continue | ||
visited.add(obj_id) | ||
|
||
if isinstance(next_val, Sequence): | ||
for content in next_val: | ||
new_queue.append((cast(AnyValue, content), current_depth + 1)) | ||
|
||
if isinstance(next_val, Mapping): | ||
for key, content in next_val.items(): | ||
size += len(key) | ||
new_queue.append((content, current_depth + 1)) | ||
else: | ||
_logger.debug( | ||
"Max log depth of %s exceeded. Log data size will not be accurately calculated.", depth | ||
) | ||
liustve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
queue = new_queue | ||
|
||
return size |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.