Skip to content
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c34f3b9
add logs pipeline
liustve Jun 18, 2025
010e7df
add logs pipeline
liustve Jun 20, 2025
24f4308
Merge remote-tracking branch 'origin/mainline' into logs-mainline
liustve Jun 20, 2025
b75fe99
linting fix
liustve Jun 20, 2025
d588605
linting fix
liustve Jun 20, 2025
c78aca5
linting fix
liustve Jun 20, 2025
12eca32
linting fix
liustve Jun 20, 2025
83ec370
linting fix
liustve Jun 20, 2025
79bbf46
linting fix
liustve Jun 20, 2025
b6e1b97
remove gen ai handling logic
liustve Jun 23, 2025
17d0f90
fixed linting
liustve Jun 23, 2025
3d12858
refactor _init_logging to 1.33.1 version
liustve Jun 24, 2025
7f90bc7
refactored batch log record processor
liustve Jun 24, 2025
fdddb7a
Merge remote-tracking branch 'upstream/main' into logs-mainline
liustve Jun 24, 2025
4b7bb0e
linting
liustve Jun 24, 2025
8c64adb
lint fix
liustve Jun 24, 2025
01e3fd8
update configuration and tests
liustve Jun 24, 2025
2f0268c
lint fix
liustve Jun 24, 2025
7dbcb7e
linting fix
liustve Jun 24, 2025
886b009
Merge remote-tracking branch 'upstream/main' into logs-mainline
liustve Jun 28, 2025
93e2836
add cycle detection
liustve Jun 28, 2025
cb21d39
add comment about termination of loop
liustve Jun 30, 2025
651f283
Merge branch 'main' into logs-mainline
liustve Jul 1, 2025
ff2fb5d
refactored otlp aws log exporter, add comments aws batch log processor
liustve Jul 1, 2025
153679a
Merge branch 'main' into logs-mainline
liustve Jul 1, 2025
bce91dc
linting fix
liustve Jul 1, 2025
8479ac9
Merge branch 'logs-mainline' of https://github.com/liustve/aws-otel-p…
liustve Jul 1, 2025
76e4b47
remove shut down check before sleep
liustve Jul 1, 2025
6dd6a67
add better estimation for non-ascii characters
liustve Jul 2, 2025
502eb01
linting + formatting fix
liustve Jul 2, 2025
b30ad4f
fix unit test
liustve Jul 2, 2025
8b7e671
linting fix
liustve Jul 2, 2025
dc98cf8
add interruptible shutdown
liustve Jul 2, 2025
3450a11
fix sleep unit tests + renaming aws batch log processor
liustve Jul 2, 2025
7a83e92
linting fix
liustve Jul 2, 2025
a38d43d
fix test
liustve Jul 2, 2025
726a9a8
linting fix
liustve Jul 2, 2025
fc77123
linting fix
liustve Jul 2, 2025
f571ffb
linting fix
liustve Jul 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
import logging
import os
import re
from logging import NOTSET, Logger, getLogger
from logging import Logger, getLogger
from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union

from importlib_metadata import version
Expand All @@ -22,12 +23,16 @@
AwsMetricAttributesSpanExporterBuilder,
)
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import (
AwsCloudWatchOtlpBatchLogRecordProcessor,
)
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView
from opentelemetry._events import set_event_logger_provider
from opentelemetry._logs import get_logger_provider, set_logger_provider
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
Expand All @@ -42,7 +47,9 @@
_import_id_generator,
_import_sampler,
_OTelSDKConfigurator,
_patch_basic_config,
)
from opentelemetry.sdk._events import EventLoggerProvider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter
from opentelemetry.sdk.environment_variables import (
Expand Down Expand Up @@ -197,26 +204,28 @@ def _initialize_components():


def _init_logging(
exporters: Dict[str, Type[LogExporter]],
resource: Resource = None,
exporters: dict[str, Type[LogExporter]],
resource: Optional[Resource] = None,
setup_logging_handler: bool = True,
):

# Provides a default OTLP log exporter when none is specified.
# This is the behavior for the logs exporters for other languages.
if not exporters:
exporters = {"otlp": OTLPLogExporter}

provider = LoggerProvider(resource=resource)
set_logger_provider(provider)

for _, exporter_class in exporters.items():
exporter_args: Dict[str, any] = {}
log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource)
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
exporter_args = {}
log_exporter: LogExporter = _customize_logs_exporter(exporter_class(**exporter_args))
log_processor = _customize_log_record_processor(log_exporter)
provider.add_log_record_processor(log_processor)

event_logger_provider = EventLoggerProvider(logger_provider=provider)
set_event_logger_provider(event_logger_provider)

handler = LoggingHandler(level=NOTSET, logger_provider=provider)
if setup_logging_handler:
_patch_basic_config()

getLogger().addHandler(handler)
# Add OTel handler
handler = LoggingHandler(level=logging.NOTSET, logger_provider=provider)
logging.getLogger().addHandler(handler)


def _init_tracing(
Expand Down Expand Up @@ -417,7 +426,14 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()


def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> LogExporter:
def _customize_log_record_processor(log_exporter: LogExporter):
if isinstance(log_exporter, OTLPAwsLogExporter) and is_agent_observability_enabled():
return AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter)

return BatchLogRecordProcessor(exporter=log_exporter)


def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)

if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
Expand Down Expand Up @@ -586,7 +602,7 @@ def _is_lambda_environment():
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ


def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> bool:
def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str] = None, service: str = "xray") -> bool:
"""Is the given endpoint an AWS OTLP endpoint?"""

pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.

import logging
from typing import Mapping, Optional, Sequence, cast

from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.util.types import AnyValue

_logger = logging.getLogger(__name__)


class AwsCloudWatchOtlpBatchLogRecordProcessor(BatchLogRecordProcessor):
"""
Custom implementation of BatchLogRecordProcessor that manages log record batching
with size-based constraints to prevent exceeding AWS CloudWatch Logs OTLP endpoint request size limits.

This processor still exports all logs up to _MAX_LOG_REQUEST_BYTE_SIZE but rather than doing exactly
one export, we will estimate log sizes and do multiple batch exports
where each exported batch will have an additional constraint:

If the batch to be exported will have a data size of > 1 MB:
The batch will be split into multiple exports of sub-batches of data size <= 1 MB.

A unique case is if the sub-batch is of data size > 1 MB, then the sub-batch will have exactly 1 log in it.
"""

# OTel log events include fixed metadata attributes so the estimated metadata size
# possibly be calculated as this with best efforts:
# service.name (255 chars) + cloud.resource_id (max ARN length) + telemetry.xxx (~20 chars) +
# common attributes (255 chars) +
# scope + flags + traceId + spanId + numeric/timestamp fields + ...
# Example log structure:
# {
# "resource": {
# "attributes": {
# "aws.local.service": "example-service123",
# "telemetry.sdk.language": "python",
# "service.name": "my-application",
# "cloud.resource_id": "example-resource",
# "aws.log.group.names": "example-log-group",
# "aws.ai.agent.type": "default",
# "telemetry.sdk.version": "1.x.x",
# "telemetry.auto.version": "0.x.x",
# "telemetry.sdk.name": "opentelemetry"
# }
# },
# "scope": {"name": "example.instrumentation.library"},
# "timeUnixNano": 1234567890123456789,
# "observedTimeUnixNano": 1234567890987654321,
# "severityNumber": 9,
# "body": {...},
# "attributes": {...},
# "flags": 1,
# "traceId": "abcd1234efgh5678ijkl9012mnop3456",
# "spanId": "1234abcd5678efgh"
# }
# 2000 might be a bit of an overestimate but it's better to overestimate the size of the log
# and suffer a small performance impact with batching than it is to underestimate and risk
# a large log being dropped when sent to the AWS otlp endpoint.
_BASE_LOG_BUFFER_BYTE_SIZE = 2000

_MAX_LOG_REQUEST_BYTE_SIZE = (
1048576 # Maximum uncompressed/unserialized bytes / request -
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
)

def __init__(
self,
exporter: OTLPAwsLogExporter,
schedule_delay_millis: Optional[float] = None,
max_export_batch_size: Optional[int] = None,
export_timeout_millis: Optional[float] = None,
max_queue_size: Optional[int] = None,
):

super().__init__(
exporter=exporter,
schedule_delay_millis=schedule_delay_millis,
max_export_batch_size=max_export_batch_size,
export_timeout_millis=export_timeout_millis,
max_queue_size=max_queue_size,
)

self._exporter = exporter

def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
"""
Explicitly overrides upstream _export method to add AWS CloudWatch size-based batching
See:
https://github.com/open-telemetry/opentelemetry-python/blob/bb21ebd46d070c359eee286c97bdf53bfd06759d/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143

Preserves existing batching behavior but will intermediarly export small log batches if
the size of the data in the batch is estimated to be at or above AWS CloudWatch's
maximum request size limit of 1 MB.

- Estimated data size of exported batches will typically be <= 1 MB except for the case below:
- If the estimated data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
"""
with self._export_lock:
iteration = 0
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
# once the lock is obtained to see if we still need to make the requested export.
while self._should_export_batch(batch_strategy, iteration):
iteration += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
batch_length = min(self._max_export_batch_size, len(self._queue))
batch_data_size = 0
batch = []

for _ in range(batch_length):
log_data: LogData = self._queue.pop()
log_size = self._estimate_log_size(log_data)

if batch and (batch_data_size + log_size > self._MAX_LOG_REQUEST_BYTE_SIZE):
self._exporter.export(batch)
batch_data_size = 0
batch = []

batch_data_size += log_size
batch.append(log_data)

if batch:
self._exporter.export(batch)
except Exception as exception: # pylint: disable=broad-exception-caught
_logger.exception("Exception while exporting logs: %s", exception)
detach(token)

def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: disable=too-many-branches
"""
Estimates the size in bytes of a log by calculating the size of its body and its attributes
and adding a buffer amount to account for other log metadata information.
Will process complex log structures up to the specified depth limit.
Includes cycle detection to prevent processing the log content more than once.
If the depth limit of the log structure is exceeded, returns the truncated calculation
to everything up to that point.

Args:
log: The Log object to calculate size for
depth: Maximum depth to traverse in nested structures (default: 3)

Returns:
int: The estimated size of the log object in bytes
"""

# Queue contains tuples of (log_content, depth) where:
# - log_content is the current piece of log data being processed
# - depth tracks how many levels deep we've traversed to reach this content
# - body starts at depth 0 since it's an AnyValue object
# - Attributes start at depth -1 since it's a Mapping[str, AnyValue] - when traversed, we will
# start processing its keys at depth 0
queue = [(log.log_record.body, 0), (log.log_record.attributes, -1)]

# Track visited complex log contents to avoid calculating the same one more than once
visited = set()

size: int = self._BASE_LOG_BUFFER_BYTE_SIZE

while queue:
new_queue = []

for data in queue:
# small optimization, can stop calculating the size once it reaches the 1 MB limit.
if size >= self._MAX_LOG_REQUEST_BYTE_SIZE:
return size

next_val, current_depth = data

if next_val is None:
continue

if isinstance(next_val, (str, bytes)):
size += len(next_val)
continue

if isinstance(next_val, (float, int, bool)):
size += len(str(next_val))
continue

# next_val must be Sequence["AnyValue"] or Mapping[str, "AnyValue"]
# See: https://github.com/open-telemetry/opentelemetry-python/blob/\
# 9426d6da834cfb4df7daedd4426bba0aa83165b5/opentelemetry-api/src/opentelemetry/util/types.py#L20
if current_depth <= depth:
obj_id = id(
next_val
) # Guaranteed to be unique, see: https://www.w3schools.com/python/ref_func_id.asp
if obj_id in visited:
continue
visited.add(obj_id)

if isinstance(next_val, Sequence):
for content in next_val:
new_queue.append((cast(AnyValue, content), current_depth + 1))

if isinstance(next_val, Mapping):
for key, content in next_val.items():
size += len(key)
new_queue.append((content, current_depth + 1))
else:
_logger.debug(
"Max log depth of %s exceeded. Log data size will not be accurately calculated.", depth
)

queue = new_queue

return size
Loading
Loading