Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

AGENT_OBSERVABILITY_ENABLED = "AGENT_OBSERVABILITY_ENABLED"


def is_installed(req: str) -> bool:
"""Is the given required package installed?"""

Expand All @@ -24,5 +25,6 @@ def is_installed(req: str) -> bool:
return False
return True


def is_agent_observability_enabled() -> bool:
return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true"
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from typing_extensions import override

from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
AttributePropagatingSpanProcessorBuilder,
Expand All @@ -22,13 +22,14 @@
AwsMetricAttributesSpanExporterBuilder,
)
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import AwsBatchLogRecordProcessor
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView
from opentelemetry._logs import set_logger_provider, get_logger_provider
from opentelemetry._logs import get_logger_provider, set_logger_provider
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
Expand Down Expand Up @@ -83,6 +84,7 @@
DEPRECATED_APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APP_SIGNALS_EXPORTER_ENDPOINT"
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT"
METRIC_EXPORT_INTERVAL_CONFIG = "OTEL_METRIC_EXPORT_INTERVAL"
OTEL_LOGS_EXPORTER = "OTEL_LOGS_EXPORTER"
DEFAULT_METRIC_EXPORT_INTERVAL = 60000.0
AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME"
AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS"
Expand Down Expand Up @@ -181,9 +183,9 @@ def _init_logging(
resource: Resource = None,
):

# Provides a default OTLP log exporter when none is specified.
# Provides a default OTLP log exporter when the environment is not set.
# This is the behavior for the logs exporters for other languages.
if not exporters:
if not exporters and os.environ.get(OTEL_LOGS_EXPORTER) is None:
exporters = {"otlp": OTLPLogExporter}

provider = LoggerProvider(resource=resource)
Expand All @@ -192,7 +194,11 @@ def _init_logging(
for _, exporter_class in exporters.items():
exporter_args: Dict[str, any] = {}
log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource)
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))

if isinstance(log_exporter, OTLPAwsLogExporter):
provider.add_log_record_processor(AwsBatchLogRecordProcessor(exporter=log_exporter))
else:
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))

handler = LoggingHandler(level=NOTSET, logger_provider=provider)

Expand Down Expand Up @@ -364,12 +370,7 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->

if isinstance(span_exporter, OTLPSpanExporter):
if is_agent_observability_enabled():
logs_endpoint = os.getenv(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)
logs_exporter = OTLPAwsLogExporter(endpoint=logs_endpoint)
span_exporter = OTLPAwsSpanExporter(
endpoint=traces_endpoint,
logger_provider=get_logger_provider()
)
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())
else:
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import logging
from typing import Mapping, Sequence

from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs._internal.export import (
_SUPPRESS_INSTRUMENTATION_KEY,
BatchLogExportStrategy,
attach,
detach,
set_value,
)
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.util.types import AnyValue

_logger = logging.getLogger(__name__)

BASE_LOG_BUFFER_BYTE_SIZE = 2000
MAX_LOG_REQUEST_BYTE_SIZE = (
1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
)


class AwsBatchLogRecordProcessor(BatchLogRecordProcessor):

def __init__(
self,
exporter: OTLPAwsLogExporter,
schedule_delay_millis: float | None = None,
max_export_batch_size: int | None = None,
export_timeout_millis: float | None = None,
max_queue_size: int | None = None,
):

super().__init__(
exporter=exporter,
schedule_delay_millis=schedule_delay_millis,
max_export_batch_size=max_export_batch_size,
export_timeout_millis=export_timeout_millis,
max_queue_size=max_queue_size,
)

# https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
"""
Preserves existing batching behavior but will intermediarly export small log batches if
the size of the data in the batch is at orabove AWS CloudWatch's maximum request size limit of 1 MB.

- Data size of exported batches will ALWAYS be <= 1 MB except for the case below:
- If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
"""

with self._export_lock:
iteration = 0
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
# once the lock is obtained to see if we still need to make the requested export.
while self._should_export_batch(batch_strategy, iteration):

iteration += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
batch_length = min(self._max_export_batch_size, len(self._queue))
batch_data_size = 0
batch = []

for _ in range(batch_length):

log_data = self._queue.pop()
log_size = self._get_size_of_log(log_data)

if batch and (batch_data_size + log_size > MAX_LOG_REQUEST_BYTE_SIZE):
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1
if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE:
self._exporter.set_gen_ai_flag()

self._exporter.export(batch)
batch_data_size = 0
batch = []

batch_data_size += log_size
batch.append(log_data)

if batch:
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1
if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE:
self._exporter.set_gen_ai_flag()

self._exporter.export(batch)
except Exception: # pylint: disable=broad-exception-caught
_logger.exception("Exception while exporting logs.")
detach(token)

@staticmethod
def _get_size_of_log(log_data: LogData) -> int:
"""
Estimates the size of a given LogData based on the size of the body + a buffer
amount representing a rough guess of other data present in the log.
"""
size = BASE_LOG_BUFFER_BYTE_SIZE
body = log_data.log_record.body

if body:
size += AwsBatchLogRecordProcessor._get_size_of_any_value(body)

return size

@staticmethod
def _get_size_of_any_value(val: AnyValue, seen=None) -> int:
"""
Recursively calculates the size of an AnyValue type in bytes.
"""

if isinstance(val, (str, bytes)):
return len(val)

if isinstance(val, bool):
return 4 if val else 5

if isinstance(val, (float, int)):
return len(str(val))

return 0
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from typing import Dict, Optional
import gzip
import logging
from io import BytesIO
from time import sleep
from typing import Dict, Optional, Sequence

import requests

from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter, _create_exp_backoff_generator
from opentelemetry.sdk._logs import (
LogData,
)
from opentelemetry.sdk._logs.export import (
LogExportResult,
)

_logger = logging.getLogger(__name__)


class OTLPAwsLogExporter(OTLPLogExporter):
_LARGE_LOG_HEADER = {"x-aws-log-semantics": "otel"}
_RETRY_AFTER_HEADER = "Retry-After" # https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling

def __init__(
self,
endpoint: Optional[str] = None,
Expand All @@ -18,6 +36,7 @@ def __init__(
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
):
self._gen_ai_flag = False
self._aws_region = None

if endpoint:
Expand All @@ -34,3 +53,133 @@ def __init__(
compression=Compression.Gzip,
session=AwsAuthSession(aws_region=self._aws_region, service="logs"),
)

# https://github.com/open-telemetry/opentelemetry-python/blob/main/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py#L167
def export(self, batch: Sequence[LogData]) -> LogExportResult:
"""
Exports the given batch of OTLP log data.
Behaviors of how this export will work -

1. Always compresses the serialized data into gzip before sending.

2. If self._gen_ai_flag is enabled, the log data is > 1 MB a
and the assumption is that the log is a normalized gen.ai LogEvent.
- inject the 'x-aws-log-semantics' flag into the header.

3. Retry behavior is now the following:
- if the response contains a status code that is retryable and the response contains Retry-After in its
headers, the serialized data will be exported after that set delay

- if the response does not contain that Retry-After header, default back to the current iteration of the
exponential backoff delay
"""

if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring batch")
return LogExportResult.FAILURE

serialized_data = encode_logs(batch).SerializeToString()

gzip_data = BytesIO()
with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream:
gzip_stream.write(serialized_data)

data = gzip_data.getvalue()

backoff = _create_exp_backoff_generator(max_value=self._MAX_RETRY_TIMEOUT)

while True:
resp = self._send(data)

if resp.ok:
return LogExportResult.SUCCESS

if not self._retryable(resp):
_logger.error(
"Failed to export logs batch code: %s, reason: %s",
resp.status_code,
resp.text,
)
self._gen_ai_flag = False
return LogExportResult.FAILURE

# https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
maybe_retry_after = resp.headers.get(self._RETRY_AFTER_HEADER, None)

# Set the next retry delay to the value of the Retry-After response in the headers.
# If Retry-After is not present in the headers, default to the next iteration of the
# exponential backoff strategy.

delay = self._parse_retryable_header(maybe_retry_after)

if delay == -1:
delay = next(backoff, self._MAX_RETRY_TIMEOUT)

if delay == self._MAX_RETRY_TIMEOUT:
_logger.error(
"Transient error %s encountered while exporting logs batch. "
"No Retry-After header found and all backoff retries exhausted. "
"Logs will not be exported.",
resp.reason,
)
self._gen_ai_flag = False
return LogExportResult.FAILURE

_logger.warning(
"Transient error %s encountered while exporting logs batch, retrying in %ss.",
resp.reason,
delay,
)

sleep(delay)

def set_gen_ai_flag(self):
"""
Sets the gen_ai flag to true to signal injecting the LLO flag to the headers of the export request.
"""
self._gen_ai_flag = True

def _send(self, serialized_data: bytes):
try:
return self._session.post(
url=self._endpoint,
headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None,
data=serialized_data,
verify=self._certificate_file,
timeout=self._timeout,
cert=self._client_cert,
)
except ConnectionError:
return self._session.post(
url=self._endpoint,
headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None,
data=serialized_data,
verify=self._certificate_file,
timeout=self._timeout,
cert=self._client_cert,
)

@staticmethod
def _retryable(resp: requests.Response) -> bool:
"""
Is it a retryable response?
"""
if resp.status_code in (429, 503):
return True

return OTLPLogExporter._retryable(resp)

@staticmethod
def _parse_retryable_header(retry_header: Optional[str]) -> float:
"""
Converts the given retryable header into a delay in seconds, returns -1 if there's no header
or error with the parsing
"""
if not retry_header:
return -1

try:
val = float(retry_header)
return val if val >= 0 else -1
except ValueError:
return -1
Loading