diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py index 011a1d19d..f550ffeb3 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py @@ -11,6 +11,7 @@ AGENT_OBSERVABILITY_ENABLED = "AGENT_OBSERVABILITY_ENABLED" + def is_installed(req: str) -> bool: """Is the given required package installed?""" @@ -24,5 +25,6 @@ def is_installed(req: str) -> bool: return False return True + def is_agent_observability_enabled() -> bool: return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index b2a695536..7f46ab3f9 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -10,8 +10,8 @@ from typing_extensions import override from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE -from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute +from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import ( AttributePropagatingSpanProcessorBuilder, @@ -22,13 +22,14 @@ AwsMetricAttributesSpanExporterBuilder, ) from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder +from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import AwsBatchLogRecordProcessor from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView -from opentelemetry._logs import set_logger_provider, get_logger_provider +from opentelemetry._logs import get_logger_provider, set_logger_provider from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter @@ -83,6 +84,7 @@ DEPRECATED_APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APP_SIGNALS_EXPORTER_ENDPOINT" APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT" METRIC_EXPORT_INTERVAL_CONFIG = "OTEL_METRIC_EXPORT_INTERVAL" +OTEL_LOGS_EXPORTER = "OTEL_LOGS_EXPORTER" DEFAULT_METRIC_EXPORT_INTERVAL = 60000.0 AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME" AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS" @@ -181,9 +183,9 @@ def _init_logging( resource: Resource = None, ): - # Provides a default OTLP log exporter when none is specified. + # Provides a default OTLP log exporter when the environment is not set. # This is the behavior for the logs exporters for other languages. - if not exporters: + if not exporters and os.environ.get(OTEL_LOGS_EXPORTER) is None: exporters = {"otlp": OTLPLogExporter} provider = LoggerProvider(resource=resource) @@ -192,7 +194,11 @@ def _init_logging( for _, exporter_class in exporters.items(): exporter_args: Dict[str, any] = {} log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource) - provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter)) + + if isinstance(log_exporter, OTLPAwsLogExporter): + provider.add_log_record_processor(AwsBatchLogRecordProcessor(exporter=log_exporter)) + else: + provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter)) handler = LoggingHandler(level=NOTSET, logger_provider=provider) @@ -364,12 +370,7 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> if isinstance(span_exporter, OTLPSpanExporter): if is_agent_observability_enabled(): - logs_endpoint = os.getenv(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT) - logs_exporter = OTLPAwsLogExporter(endpoint=logs_endpoint) - span_exporter = OTLPAwsSpanExporter( - endpoint=traces_endpoint, - logger_provider=get_logger_provider() - ) + span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider()) else: span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py new file mode 100644 index 000000000..92156ed67 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py @@ -0,0 +1,122 @@ +import logging +from typing import Mapping, Sequence + +from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs._internal.export import ( + _SUPPRESS_INSTRUMENTATION_KEY, + BatchLogExportStrategy, + attach, + detach, + set_value, +) +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.util.types import AnyValue + +_logger = logging.getLogger(__name__) + +BASE_LOG_BUFFER_BYTE_SIZE = 2000 +MAX_LOG_REQUEST_BYTE_SIZE = ( + 1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html +) + + +class AwsBatchLogRecordProcessor(BatchLogRecordProcessor): + + def __init__( + self, + exporter: OTLPAwsLogExporter, + schedule_delay_millis: float | None = None, + max_export_batch_size: int | None = None, + export_timeout_millis: float | None = None, + max_queue_size: int | None = None, + ): + + super().__init__( + exporter=exporter, + schedule_delay_millis=schedule_delay_millis, + max_export_batch_size=max_export_batch_size, + export_timeout_millis=export_timeout_millis, + max_queue_size=max_queue_size, + ) + + # https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143 + def _export(self, batch_strategy: BatchLogExportStrategy) -> None: + """ + Preserves existing batching behavior but will intermediarly export small log batches if + the size of the data in the batch is at orabove AWS CloudWatch's maximum request size limit of 1 MB. + + - Data size of exported batches will ALWAYS be <= 1 MB except for the case below: + - If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1 + """ + + with self._export_lock: + iteration = 0 + # We could see concurrent export calls from worker and force_flush. We call _should_export_batch + # once the lock is obtained to see if we still need to make the requested export. + while self._should_export_batch(batch_strategy, iteration): + + iteration += 1 + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + batch_length = min(self._max_export_batch_size, len(self._queue)) + batch_data_size = 0 + batch = [] + + for _ in range(batch_length): + + log_data = self._queue.pop() + log_size = self._get_size_of_log(log_data) + + if batch and (batch_data_size + log_size > MAX_LOG_REQUEST_BYTE_SIZE): + # if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1 + if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE: + self._exporter.set_gen_ai_flag() + + self._exporter.export(batch) + batch_data_size = 0 + batch = [] + + batch_data_size += log_size + batch.append(log_data) + + if batch: + # if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1 + if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE: + self._exporter.set_gen_ai_flag() + + self._exporter.export(batch) + except Exception: # pylint: disable=broad-exception-caught + _logger.exception("Exception while exporting logs.") + detach(token) + + @staticmethod + def _get_size_of_log(log_data: LogData) -> int: + """ + Estimates the size of a given LogData based on the size of the body + a buffer + amount representing a rough guess of other data present in the log. + """ + size = BASE_LOG_BUFFER_BYTE_SIZE + body = log_data.log_record.body + + if body: + size += AwsBatchLogRecordProcessor._get_size_of_any_value(body) + + return size + + @staticmethod + def _get_size_of_any_value(val: AnyValue, seen=None) -> int: + """ + Recursively calculates the size of an AnyValue type in bytes. + """ + + if isinstance(val, (str, bytes)): + return len(val) + + if isinstance(val, bool): + return 4 if val else 5 + + if isinstance(val, (float, int)): + return len(str(val)) + + return 0 diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py index 048632c06..126adccd4 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py @@ -1,14 +1,32 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -from typing import Dict, Optional +import gzip +import logging +from io import BytesIO +from time import sleep +from typing import Dict, Optional, Sequence + +import requests from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession +from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import Compression -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter +from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter, _create_exp_backoff_generator +from opentelemetry.sdk._logs import ( + LogData, +) +from opentelemetry.sdk._logs.export import ( + LogExportResult, +) + +_logger = logging.getLogger(__name__) class OTLPAwsLogExporter(OTLPLogExporter): + _LARGE_LOG_HEADER = {"x-aws-log-semantics": "otel"} + _RETRY_AFTER_HEADER = "Retry-After" # https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + def __init__( self, endpoint: Optional[str] = None, @@ -18,6 +36,7 @@ def __init__( headers: Optional[Dict[str, str]] = None, timeout: Optional[int] = None, ): + self._gen_ai_flag = False self._aws_region = None if endpoint: @@ -34,3 +53,133 @@ def __init__( compression=Compression.Gzip, session=AwsAuthSession(aws_region=self._aws_region, service="logs"), ) + + # https://github.com/open-telemetry/opentelemetry-python/blob/main/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py#L167 + def export(self, batch: Sequence[LogData]) -> LogExportResult: + """ + Exports the given batch of OTLP log data. + Behaviors of how this export will work - + + 1. Always compresses the serialized data into gzip before sending. + + 2. If self._gen_ai_flag is enabled, the log data is > 1 MB a + and the assumption is that the log is a normalized gen.ai LogEvent. + - inject the 'x-aws-log-semantics' flag into the header. + + 3. Retry behavior is now the following: + - if the response contains a status code that is retryable and the response contains Retry-After in its + headers, the serialized data will be exported after that set delay + + - if the response does not contain that Retry-After header, default back to the current iteration of the + exponential backoff delay + """ + + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring batch") + return LogExportResult.FAILURE + + serialized_data = encode_logs(batch).SerializeToString() + + gzip_data = BytesIO() + with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: + gzip_stream.write(serialized_data) + + data = gzip_data.getvalue() + + backoff = _create_exp_backoff_generator(max_value=self._MAX_RETRY_TIMEOUT) + + while True: + resp = self._send(data) + + if resp.ok: + return LogExportResult.SUCCESS + + if not self._retryable(resp): + _logger.error( + "Failed to export logs batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + self._gen_ai_flag = False + return LogExportResult.FAILURE + + # https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + maybe_retry_after = resp.headers.get(self._RETRY_AFTER_HEADER, None) + + # Set the next retry delay to the value of the Retry-After response in the headers. + # If Retry-After is not present in the headers, default to the next iteration of the + # exponential backoff strategy. + + delay = self._parse_retryable_header(maybe_retry_after) + + if delay == -1: + delay = next(backoff, self._MAX_RETRY_TIMEOUT) + + if delay == self._MAX_RETRY_TIMEOUT: + _logger.error( + "Transient error %s encountered while exporting logs batch. " + "No Retry-After header found and all backoff retries exhausted. " + "Logs will not be exported.", + resp.reason, + ) + self._gen_ai_flag = False + return LogExportResult.FAILURE + + _logger.warning( + "Transient error %s encountered while exporting logs batch, retrying in %ss.", + resp.reason, + delay, + ) + + sleep(delay) + + def set_gen_ai_flag(self): + """ + Sets the gen_ai flag to true to signal injecting the LLO flag to the headers of the export request. + """ + self._gen_ai_flag = True + + def _send(self, serialized_data: bytes): + try: + return self._session.post( + url=self._endpoint, + headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None, + data=serialized_data, + verify=self._certificate_file, + timeout=self._timeout, + cert=self._client_cert, + ) + except ConnectionError: + return self._session.post( + url=self._endpoint, + headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None, + data=serialized_data, + verify=self._certificate_file, + timeout=self._timeout, + cert=self._client_cert, + ) + + @staticmethod + def _retryable(resp: requests.Response) -> bool: + """ + Is it a retryable response? + """ + if resp.status_code in (429, 503): + return True + + return OTLPLogExporter._retryable(resp) + + @staticmethod + def _parse_retryable_header(retry_header: Optional[str]) -> float: + """ + Converts the given retryable header into a delay in seconds, returns -1 if there's no header + or error with the parsing + """ + if not retry_header: + return -1 + + try: + val = float(retry_header) + return val if val >= 0 else -1 + except ValueError: + return -1 diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py index 7defb5d47..98d649fd4 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py @@ -3,12 +3,12 @@ from typing import Dict, Optional, Sequence +from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession from amazon.opentelemetry.distro.llo_handler import LLOHandler -from amazon.opentelemetry.distro._utils import is_agent_observability_enabled -from opentelemetry.sdk._logs import LoggerProvider from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk._logs import LoggerProvider from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExportResult @@ -25,7 +25,7 @@ def __init__( headers: Optional[Dict[str, str]] = None, timeout: Optional[int] = None, compression: Optional[Compression] = None, - logger_provider: Optional[LoggerProvider] = None + logger_provider: Optional[LoggerProvider] = None, ): self._aws_region = None diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py index bf4dabeb3..1ce90fd58 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py @@ -1,13 +1,13 @@ import logging import re - from typing import Any, Dict, List, Optional, Sequence -from opentelemetry.attributes import BoundedAttributes from opentelemetry._events import Event -from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.attributes import BoundedAttributes from opentelemetry.sdk._events import EventLoggerProvider -from opentelemetry.sdk.trace import ReadableSpan, Event as SpanEvent +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk.trace import Event as SpanEvent +from opentelemetry.sdk.trace import ReadableSpan # Message event types GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py index 70dfe36c4..26a6a3682 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py @@ -220,7 +220,7 @@ def extract_attributes(self, attributes: _AttributeMapT): knowledge_base_id = self._call_context.params.get(_KNOWLEDGE_BASE_ID) if knowledge_base_id: attributes[AWS_BEDROCK_KNOWLEDGE_BASE_ID] = knowledge_base_id - + def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): # Currently no attributes to extract from the result pass diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_auth_session.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/common/test_aws_auth_session.py similarity index 100% rename from aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_auth_session.py rename to aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/common/test_aws_auth_session.py diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/logs/aws_batch_log_record_processor_test.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/logs/aws_batch_log_record_processor_test.py new file mode 100644 index 000000000..24b559d7d --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/logs/aws_batch_log_record_processor_test.py @@ -0,0 +1,136 @@ +import time +import unittest +from unittest.mock import MagicMock, patch + +from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import ( + BASE_LOG_BUFFER_BYTE_SIZE, + MAX_LOG_REQUEST_BYTE_SIZE, + AwsBatchLogRecordProcessor, + BatchLogExportStrategy, +) +from opentelemetry._logs.severity import SeverityNumber +from opentelemetry.sdk._logs import LogData, LogRecord +from opentelemetry.sdk._logs.export import LogExportResult +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.trace import TraceFlags + + +class TestAwsBatchLogRecordProcessor(unittest.TestCase): + + def setUp(self): + self.mock_exporter = MagicMock() + self.mock_exporter.export.return_value = LogExportResult.SUCCESS + + self.processor = AwsBatchLogRecordProcessor(exporter=self.mock_exporter) + self.logs = self.generate_test_log_data() + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.attach", + return_value=MagicMock(), + ) + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.detach") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.set_value") + def test_export_single_batch_under_size_limit(self, _, __, ___): + """Tests that export is only called once if a single batch is under the size limit""" + log_count = 10 + test_logs = self.generate_test_log_data(count=log_count) + total_data_size = 0 + + for log in test_logs: + total_data_size += self.processor._get_size_of_log(log) + self.processor._queue.appendleft(log) + + self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL) + args, kwargs = self.mock_exporter.export.call_args + actual_batch = args[0] + + self.assertLess(total_data_size, MAX_LOG_REQUEST_BYTE_SIZE) + self.assertEqual(len(self.processor._queue), 0) + self.assertEqual(len(actual_batch), log_count) + self.mock_exporter.export.assert_called_once() + self.mock_exporter.set_gen_ai_flag.assert_not_called() + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.attach", + return_value=MagicMock(), + ) + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.detach") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.set_value") + def test_export_single_batch_all_logs_over_size_limit(self, _, __, ___): + """Should make multiple export calls of batch size 1 to export logs of size > 1 MB""" + test_logs = self.generate_test_log_data(count=3, body_size=(MAX_LOG_REQUEST_BYTE_SIZE + 1)) + + for log in test_logs: + self.processor._queue.appendleft(log) + + self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL) + + self.assertEqual(len(self.processor._queue), 0) + self.assertEqual(self.mock_exporter.export.call_count, 3) + self.assertEqual(self.mock_exporter.set_gen_ai_flag.call_count, 3) + + batches = self.mock_exporter.export.call_args_list + + for batch in batches: + self.assertEquals(len(batch[0]), 1) + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.attach", + return_value=MagicMock(), + ) + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.detach") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.set_value") + def test_export_single_batch_some_logs_over_size_limit(self, _, __, ___): + """Should make calls to export smaller sub-batch logs""" + test_logs = self.generate_test_log_data(count=3, body_size=(MAX_LOG_REQUEST_BYTE_SIZE + 1)) + # 1st, 2nd, 3rd batch = size 1 + # 4th batch = size 10 + # 5th batch = size 2 + small_logs = self.generate_test_log_data(count=12, body_size=(104857 - BASE_LOG_BUFFER_BYTE_SIZE)) + + test_logs.extend(small_logs) + + for log in test_logs: + self.processor._queue.appendleft(log) + + self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL) + + self.assertEqual(len(self.processor._queue), 0) + self.assertEqual(self.mock_exporter.export.call_count, 5) + self.assertEqual(self.mock_exporter.set_gen_ai_flag.call_count, 3) + + batches = self.mock_exporter.export.call_args_list + + expected_sizes = { + 0: 1, # 1st batch (index 1) should have 1 log + 1: 1, # 2nd batch (index 1) should have 1 log + 2: 1, # 3rd batch (index 2) should have 1 log + 3: 10, # 4th batch (index 3) should have 10 logs + 4: 2, # 5th batch (index 4) should have 2 logs + } + + for i, call in enumerate(batches): + batch = call[0][0] + expected_size = expected_sizes[i] + self.assertEqual(len(batch), expected_size) + + def generate_test_log_data(self, count=5, body_size=100): + logs = [] + for i in range(count): + body = "X" * body_size + + record = LogRecord( + timestamp=int(time.time_ns()), + trace_id=int(f"0x{i + 1:032x}", 16), + span_id=int(f"0x{i + 1:016x}", 16), + trace_flags=TraceFlags(1), + severity_text="INFO", + severity_number=SeverityNumber.INFO, + body=body, + attributes={"test.attribute": f"value-{i + 1}"}, + ) + + log_data = LogData(log_record=record, instrumentation_scope=InstrumentationScope("test-scope", "1.0.0")) + logs.append(log_data) + + return logs diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/logs/otlp_aws_logs_exporter_test.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/logs/otlp_aws_logs_exporter_test.py new file mode 100644 index 000000000..fd7d1c431 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/logs/otlp_aws_logs_exporter_test.py @@ -0,0 +1,177 @@ +import time +from unittest import TestCase +from unittest.mock import patch + +import requests + +from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter +from opentelemetry._logs.severity import SeverityNumber +from opentelemetry.sdk._logs import LogData, LogRecord +from opentelemetry.sdk._logs.export import ( + LogExportResult, +) +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.trace import TraceFlags + + +class TestOTLPAwsLogsExporter(TestCase): + _ENDPOINT = "https://logs.us-west-2.amazonaws.com/v1/logs" + good_response = requests.Response() + good_response.status_code = 200 + + non_retryable_response = requests.Response() + non_retryable_response.status_code = 404 + + retryable_response_no_header = requests.Response() + retryable_response_no_header.status_code = 429 + + retryable_response_header = requests.Response() + retryable_response_header.headers = {"Retry-After": "10"} + retryable_response_header.status_code = 503 + + retryable_response_bad_header = requests.Response() + retryable_response_bad_header.headers = {"Retry-After": "-12"} + retryable_response_bad_header.status_code = 503 + + def setUp(self): + self.logs = self.generate_test_log_data() + self.exporter = OTLPAwsLogExporter(endpoint=self._ENDPOINT) + + @patch("requests.Session.request", return_value=good_response) + def test_export_success(self, mock_request): + """Tests that the exporter always compresses the serialized logs with gzip before exporting.""" + result = self.exporter.export(self.logs) + + mock_request.assert_called_once() + + _, kwargs = mock_request.call_args + data = kwargs.get("data", None) + + self.assertEqual(result, LogExportResult.SUCCESS) + + # Gzip first 10 bytes are reserved for metadata headers: + # https://www.loc.gov/preservation/digital/formats/fdd/fdd000599.shtml?loclr=blogsig + self.assertIsNotNone(data) + self.assertTrue(len(data) >= 10) + self.assertEqual(data[0:2], b"\x1f\x8b") + + @patch("requests.Session.request", return_value=good_response) + def test_export_gen_ai(self, mock_request): + """Tests that when gen_ai_flag is set, the exporter includes the x-aws-log-semantics header in the request.""" + self.exporter.set_gen_ai_flag() + result = self.exporter.export(self.logs) + + mock_request.assert_called_once() + + _, kwargs = mock_request.call_args + headers = kwargs.get("headers", None) + + self.assertEqual(result, LogExportResult.SUCCESS) + self.assertIsNotNone(headers) + self.assertIn("x-aws-log-semantics", headers) + self.assertEqual(headers["x-aws-log-semantics"], "otel") + + @patch("requests.Session.request", return_value=good_response) + def test_should_not_export_if_shutdown(self, mock_request): + """Tests that no export request is made if the exporter is shutdown.""" + self.exporter.shutdown() + result = self.exporter.export(self.logs) + + mock_request.assert_not_called() + self.assertEqual(result, LogExportResult.FAILURE) + + @patch("requests.Session.request", return_value=non_retryable_response) + def test_should_not_export_again_if_not_retryable(self, mock_request): + """Tests that only one export request is made if the response status code is non-retryable.""" + result = self.exporter.export(self.logs) + mock_request.assert_called_once() + + self.assertEqual(result, LogExportResult.FAILURE) + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None + ) + @patch("requests.Session.request", return_value=retryable_response_no_header) + def test_should_export_again_with_backoff_if_retryable_and_no_retry_after_header(self, mock_request, mock_sleep): + """Tests that multiple export requests are made with exponential delay if the response status code is retryable. + But there is no Retry-After header.""" + result = self.exporter.export(self.logs) + + # 1, 2, 4, 8, 16, 32 delays + self.assertEqual(mock_sleep.call_count, 6) + + delays = mock_sleep.call_args_list + + for i in range(len(delays)): + self.assertEqual(delays[i][0][0], 2**i) + + # Number of calls: 1 + len(1, 2, 4, 8, 16, 32 delays) + self.assertEqual(mock_request.call_count, 7) + self.assertEqual(result, LogExportResult.FAILURE) + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None + ) + @patch( + "requests.Session.request", + side_effect=[retryable_response_header, retryable_response_header, retryable_response_header, good_response], + ) + def test_should_export_again_with_server_delay_if_retryable_and_retry_after_header(self, mock_request, mock_sleep): + """Tests that multiple export requests are made with the server's suggested + delay if the response status code is retryable and there is a Retry-After header.""" + result = self.exporter.export(self.logs) + delays = mock_sleep.call_args_list + + for i in range(len(delays)): + self.assertEqual(delays[i][0][0], 10) + + self.assertEqual(mock_sleep.call_count, 3) + self.assertEqual(mock_request.call_count, 4) + self.assertEqual(result, LogExportResult.SUCCESS) + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None + ) + @patch( + "requests.Session.request", + side_effect=[ + retryable_response_bad_header, + retryable_response_bad_header, + retryable_response_bad_header, + good_response, + ], + ) + def test_should_export_again_with_backoff_delay_if_retryable_and_bad_retry_after_header( + self, mock_request, mock_sleep + ): + """Tests that multiple export requests are made with exponential delay if the response status code is retryable. + but the Retry-After header ins invalid or malformed.""" + result = self.exporter.export(self.logs) + delays = mock_sleep.call_args_list + + for i in range(len(delays)): + self.assertEqual(delays[i][0][0], 2**i) + + self.assertEqual(mock_sleep.call_count, 3) + self.assertEqual(mock_request.call_count, 4) + self.assertEqual(result, LogExportResult.SUCCESS) + + def generate_test_log_data(self, count=5): + logs = [] + for i in range(count): + record = LogRecord( + timestamp=int(time.time_ns()), + trace_id=int(f"0x{i + 1:032x}", 16), + span_id=int(f"0x{i + 1:016x}", 16), + trace_flags=TraceFlags(1), + severity_text="INFO", + severity_number=SeverityNumber.INFO, + body=f"Test log {i + 1}", + attributes={"test.attribute": f"value-{i + 1}"}, + ) + + log_data = LogData(log_record=record, instrumentation_scope=InstrumentationScope("test-scope", "1.0.0")) + + logs.append(log_data) + + return logs diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py index 5d37f03a4..636523e6b 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py @@ -1,5 +1,5 @@ from unittest import TestCase -from unittest.mock import MagicMock, patch, call +from unittest.mock import MagicMock, call, patch from amazon.opentelemetry.distro.llo_handler import LLOHandler from opentelemetry._events import Event