Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t
([#497](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/497))
- Fix timeout handling for exceeded deadline in retry logic in OTLPAwsLogsExporter
([#501](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/501))
- Bump ADOT Python version to 0.13.0 and OTel dependencies to 1.37.0/0.58b0
([#524](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/524))
110 changes: 55 additions & 55 deletions aws-opentelemetry-distro/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,63 +26,63 @@ classifiers = [
]

dependencies = [
"opentelemetry-api == 1.33.1",
"opentelemetry-sdk == 1.33.1",
"opentelemetry-exporter-otlp-proto-grpc == 1.33.1",
"opentelemetry-exporter-otlp-proto-http == 1.33.1",
"opentelemetry-propagator-b3 == 1.33.1",
"opentelemetry-propagator-jaeger == 1.33.1",
"opentelemetry-exporter-otlp-proto-common == 1.33.1",
"opentelemetry-api == 1.37.0",
"opentelemetry-sdk == 1.37.0",
"opentelemetry-exporter-otlp-proto-grpc == 1.37.0",
"opentelemetry-exporter-otlp-proto-http == 1.37.0",
"opentelemetry-propagator-b3 == 1.37.0",
"opentelemetry-propagator-jaeger == 1.37.0",
"opentelemetry-exporter-otlp-proto-common == 1.37.0",
"opentelemetry-sdk-extension-aws == 2.0.2",
"opentelemetry-propagator-aws-xray == 1.0.1",
"opentelemetry-distro == 0.54b1",
"opentelemetry-processor-baggage == 0.54b1",
"opentelemetry-propagator-ot-trace == 0.54b1",
"opentelemetry-instrumentation == 0.54b1",
"opentelemetry-instrumentation-aws-lambda == 0.54b1",
"opentelemetry-instrumentation-aio-pika == 0.54b1",
"opentelemetry-instrumentation-aiohttp-client == 0.54b1",
"opentelemetry-instrumentation-aiokafka == 0.54b1",
"opentelemetry-instrumentation-aiopg == 0.54b1",
"opentelemetry-instrumentation-asgi == 0.54b1",
"opentelemetry-instrumentation-asyncpg == 0.54b1",
"opentelemetry-instrumentation-boto == 0.54b1",
"opentelemetry-instrumentation-boto3sqs == 0.54b1",
"opentelemetry-instrumentation-botocore == 0.54b1",
"opentelemetry-instrumentation-celery == 0.54b1",
"opentelemetry-instrumentation-confluent-kafka == 0.54b1",
"opentelemetry-instrumentation-dbapi == 0.54b1",
"opentelemetry-instrumentation-django == 0.54b1",
"opentelemetry-instrumentation-elasticsearch == 0.54b1",
"opentelemetry-instrumentation-falcon == 0.54b1",
"opentelemetry-instrumentation-fastapi == 0.54b1",
"opentelemetry-instrumentation-flask == 0.54b1",
"opentelemetry-instrumentation-grpc == 0.54b1",
"opentelemetry-instrumentation-httpx == 0.54b1",
"opentelemetry-instrumentation-jinja2 == 0.54b1",
"opentelemetry-instrumentation-kafka-python == 0.54b1",
"opentelemetry-instrumentation-logging == 0.54b1",
"opentelemetry-instrumentation-mysql == 0.54b1",
"opentelemetry-instrumentation-mysqlclient == 0.54b1",
"opentelemetry-instrumentation-pika == 0.54b1",
"opentelemetry-instrumentation-psycopg2 == 0.54b1",
"opentelemetry-instrumentation-pymemcache == 0.54b1",
"opentelemetry-instrumentation-pymongo == 0.54b1",
"opentelemetry-instrumentation-pymysql == 0.54b1",
"opentelemetry-instrumentation-pyramid == 0.54b1",
"opentelemetry-instrumentation-redis == 0.54b1",
"opentelemetry-instrumentation-remoulade == 0.54b1",
"opentelemetry-instrumentation-requests == 0.54b1",
"opentelemetry-instrumentation-sqlalchemy == 0.54b1",
"opentelemetry-instrumentation-sqlite3 == 0.54b1",
"opentelemetry-instrumentation-starlette == 0.54b1",
"opentelemetry-instrumentation-system-metrics == 0.54b1",
"opentelemetry-instrumentation-tornado == 0.54b1",
"opentelemetry-instrumentation-tortoiseorm == 0.54b1",
"opentelemetry-instrumentation-urllib == 0.54b1",
"opentelemetry-instrumentation-urllib3 == 0.54b1",
"opentelemetry-instrumentation-wsgi == 0.54b1",
"opentelemetry-instrumentation-cassandra == 0.54b1",
"opentelemetry-distro == 0.58b0",
"opentelemetry-processor-baggage == 0.58b0",
"opentelemetry-propagator-ot-trace == 0.58b0",
"opentelemetry-instrumentation == 0.58b0",
"opentelemetry-instrumentation-aws-lambda == 0.58b0",
"opentelemetry-instrumentation-aio-pika == 0.58b0",
"opentelemetry-instrumentation-aiohttp-client == 0.58b0",
"opentelemetry-instrumentation-aiokafka == 0.58b0",
"opentelemetry-instrumentation-aiopg == 0.58b0",
"opentelemetry-instrumentation-asgi == 0.58b0",
"opentelemetry-instrumentation-asyncpg == 0.58b0",
"opentelemetry-instrumentation-boto == 0.58b0",
"opentelemetry-instrumentation-boto3sqs == 0.58b0",
"opentelemetry-instrumentation-botocore == 0.58b0",
"opentelemetry-instrumentation-celery == 0.58b0",
"opentelemetry-instrumentation-confluent-kafka == 0.58b0",
"opentelemetry-instrumentation-dbapi == 0.58b0",
"opentelemetry-instrumentation-django == 0.58b0",
"opentelemetry-instrumentation-elasticsearch == 0.58b0",
"opentelemetry-instrumentation-falcon == 0.58b0",
"opentelemetry-instrumentation-fastapi == 0.58b0",
"opentelemetry-instrumentation-flask == 0.58b0",
"opentelemetry-instrumentation-grpc == 0.58b0",
"opentelemetry-instrumentation-httpx == 0.58b0",
"opentelemetry-instrumentation-jinja2 == 0.58b0",
"opentelemetry-instrumentation-kafka-python == 0.58b0",
"opentelemetry-instrumentation-logging == 0.58b0",
"opentelemetry-instrumentation-mysql == 0.58b0",
"opentelemetry-instrumentation-mysqlclient == 0.58b0",
"opentelemetry-instrumentation-pika == 0.58b0",
"opentelemetry-instrumentation-psycopg2 == 0.58b0",
"opentelemetry-instrumentation-pymemcache == 0.58b0",
"opentelemetry-instrumentation-pymongo == 0.58b0",
"opentelemetry-instrumentation-pymysql == 0.58b0",
"opentelemetry-instrumentation-pyramid == 0.58b0",
"opentelemetry-instrumentation-redis == 0.58b0",
"opentelemetry-instrumentation-remoulade == 0.58b0",
"opentelemetry-instrumentation-requests == 0.58b0",
"opentelemetry-instrumentation-sqlalchemy == 0.58b0",
"opentelemetry-instrumentation-sqlite3 == 0.58b0",
"opentelemetry-instrumentation-starlette == 0.58b0",
"opentelemetry-instrumentation-system-metrics == 0.58b0",
"opentelemetry-instrumentation-tornado == 0.58b0",
"opentelemetry-instrumentation-tortoiseorm == 0.58b0",
"opentelemetry-instrumentation-urllib == 0.58b0",
"opentelemetry-instrumentation-urllib3 == 0.58b0",
"opentelemetry-instrumentation-wsgi == 0.58b0",
"opentelemetry-instrumentation-cassandra == 0.58b0",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@


class BatchUnsampledSpanProcessor(BaseBatchSpanProcessor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._spans_dropped = False

# pylint: disable=no-self-use
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
Expand All @@ -22,19 +25,18 @@ def on_end(self, span: ReadableSpan) -> None:
if span.context.trace_flags.sampled:
return

if self.done:
if self._batch_processor._shutdown:
logger.warning("Already shutdown, dropping span.")
return

if len(self.queue) == self.max_queue_size:
if len(self._batch_processor._queue) == self._batch_processor._max_queue_size:
# pylint: disable=access-member-before-definition
if not self._spans_dropped:
logger.warning("Queue is full, likely spans will be dropped.")
# pylint: disable=attribute-defined-outside-init
self._spans_dropped = True

self.queue.appendleft(span)
self._batch_processor._queue.appendleft(span)

if len(self.queue) >= self.max_export_batch_size:
with self.condition:
self.condition.notify()
if len(self._batch_processor._queue) >= self._batch_processor._max_export_batch_size:
self._batch_processor._worker_awaken.set()
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
_import_id_generator,
_import_sampler,
_OTelSDKConfigurator,
_patch_basic_config,
_overwrite_logging_config_fns,
)
from opentelemetry.sdk._events import EventLoggerProvider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
Expand Down Expand Up @@ -230,11 +230,10 @@ def _init_logging(
set_event_logger_provider(event_logger_provider)

if setup_logging_handler:
_patch_basic_config()

# Add OTel handler
handler = LoggingHandler(level=logging.NOTSET, logger_provider=provider)
logging.getLogger().addHandler(handler)
_overwrite_logging_config_fns(handler)


def _init_tracing(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk._shared_internal import BatchExportStrategy
from opentelemetry.util.types import AnyValue

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -89,7 +89,7 @@ def __init__(

self._exporter = exporter

def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
def _export(self, batch_strategy: BatchExportStrategy) -> None:
"""
Explicitly overrides upstream _export method to add AWS CloudWatch size-based batching
See:
Expand All @@ -102,20 +102,20 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
- Estimated data size of exported batches will typically be <= 1 MB except for the case below:
- If the estimated data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
"""
with self._export_lock:
with self._batch_processor._export_lock:
iteration = 0
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
# once the lock is obtained to see if we still need to make the requested export.
while self._should_export_batch(batch_strategy, iteration):
while self._batch_processor._should_export_batch(batch_strategy, iteration):
iteration += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
batch_length = min(self._max_export_batch_size, len(self._queue))
batch_length = min(self._batch_processor._max_export_batch_size, len(self._batch_processor._queue))
batch_data_size = 0
batch = []

for _ in range(batch_length):
log_data: LogData = self._queue.pop()
log_data: LogData = self._batch_processor._queue.pop()
log_size = self._estimate_log_size(log_data)

if batch and (batch_data_size + log_size > self._MAX_LOG_REQUEST_BYTE_SIZE):
Expand Down Expand Up @@ -252,7 +252,7 @@ def _estimate_utf8_size(s: str):
# https://github.com/open-telemetry/opentelemetry-python/issues/3193
# https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L199
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
if self._shutdown:
if self._batch_processor._shutdown:
return False
self._export(BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH)
self._export(BatchExportStrategy.EXPORT_AT_LEAST_ONE_BATCH)
return True
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._common import _is_retryable
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs.export import LogExportResult
Expand Down Expand Up @@ -171,7 +172,7 @@ def _retryable(resp: Response) -> bool:
"""
# See: https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling

return resp.status_code in (429, 503) or OTLPLogExporter._retryable(resp)
return resp.status_code in (429, 503) or _is_retryable(resp)

def _get_retry_delay_sec(self, headers: CaseInsensitiveDict, retry_num: int) -> float:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

__version__ = "0.12.1.dev0"
__version__ = "0.13.0.dev0"
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import (
AwsCloudWatchOtlpBatchLogRecordProcessor,
BatchLogExportStrategy,
)
from opentelemetry._logs.severity import SeverityNumber
from opentelemetry.sdk._logs import LogData, LogRecord
from opentelemetry.sdk._logs.export import LogExportResult
from opentelemetry.sdk._shared_internal import BatchExportStrategy
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.trace import TraceFlags
from opentelemetry.util.types import AnyValue
Expand Down Expand Up @@ -151,14 +151,14 @@ def test_export_single_batch_under_size_limit(self, _, __, ___):
for log in test_logs:
size = self.processor._estimate_log_size(log)
total_data_size += size
self.processor._queue.appendleft(log)
self.processor._batch_processor._queue.appendleft(log)

self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL)
self.processor._export(batch_strategy=BatchExportStrategy.EXPORT_ALL)
args, _ = self.mock_exporter.export.call_args
actual_batch = args[0]

self.assertLess(total_data_size, self.processor._MAX_LOG_REQUEST_BYTE_SIZE)
self.assertEqual(len(self.processor._queue), 0)
self.assertEqual(len(self.processor._batch_processor._queue), 0)
self.assertEqual(len(actual_batch), log_count)
self.mock_exporter.export.assert_called_once()

Expand All @@ -175,11 +175,11 @@ def test_export_single_batch_all_logs_over_size_limit(self, _, __, ___):
test_logs = self.generate_test_log_data(log_body=large_log_body, count=15)

for log in test_logs:
self.processor._queue.appendleft(log)
self.processor._batch_processor._queue.appendleft(log)

self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL)
self.processor._export(batch_strategy=BatchExportStrategy.EXPORT_ALL)

self.assertEqual(len(self.processor._queue), 0)
self.assertEqual(len(self.processor._batch_processor._queue), 0)
self.assertEqual(self.mock_exporter.export.call_count, len(test_logs))

batches = self.mock_exporter.export.call_args_list
Expand Down Expand Up @@ -209,11 +209,11 @@ def test_export_single_batch_some_logs_over_size_limit(self, _, __, ___):
test_logs = large_logs + small_logs

for log in test_logs:
self.processor._queue.appendleft(log)
self.processor._batch_processor._queue.appendleft(log)

self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL)
self.processor._export(batch_strategy=BatchExportStrategy.EXPORT_ALL)

self.assertEqual(len(self.processor._queue), 0)
self.assertEqual(len(self.processor._batch_processor._queue), 0)
self.assertEqual(self.mock_exporter.export.call_count, 5)

batches = self.mock_exporter.export.call_args_list
Expand Down Expand Up @@ -249,22 +249,22 @@ def test_force_flush_returns_false_when_shutdown(self):
def test_force_flush_exports_only_one_batch(self, _, __, ___):
"""Tests that force_flush should try to at least export one batch of logs. Rest of the logs will be dropped"""
# Set max_export_batch_size to 5 to limit batch size
self.processor._max_export_batch_size = 5
self.processor._shutdown = False
self.processor._batch_processor._max_export_batch_size = 5
self.processor._batch_processor._shutdown = False

# Add 6 logs to queue, after the export there should be 1 log remaining
log_count = 6
test_logs = self.generate_test_log_data(log_body="test message", count=log_count)

for log in test_logs:
self.processor._queue.appendleft(log)
self.processor._batch_processor._queue.appendleft(log)

self.assertEqual(len(self.processor._queue), log_count)
self.assertEqual(len(self.processor._batch_processor._queue), log_count)

result = self.processor.force_flush()

self.assertTrue(result)
self.assertEqual(len(self.processor._queue), 1)
self.assertEqual(len(self.processor._batch_processor._queue), 1)
self.mock_exporter.export.assert_called_once()

# Verify only one batch of 5 logs was exported
Expand All @@ -287,18 +287,18 @@ def test_export_handles_exception_gracefully(self, mock_logger, _, __, ___):
# Add logs to queue
test_logs = self.generate_test_log_data(log_body="test message", count=2)
for log in test_logs:
self.processor._queue.appendleft(log)
self.processor._batch_processor._queue.appendleft(log)

# Call _export - should not raise exception
self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL)
self.processor._export(batch_strategy=BatchExportStrategy.EXPORT_ALL)

# Verify exception was logged
mock_logger.exception.assert_called_once()
call_args = mock_logger.exception.call_args[0]
self.assertIn("Exception while exporting logs:", call_args[0])

# Queue should be empty even though export failed
self.assertEqual(len(self.processor._queue), 0)
self.assertEqual(len(self.processor._batch_processor._queue), 0)

@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor._logger")
def test_estimate_log_size_debug_logging_on_depth_exceeded(self, mock_logger):
Expand Down Expand Up @@ -350,7 +350,7 @@ def test_constructor_with_custom_parameters(self):
self.assertEqual(custom_processor._exporter, self.mock_exporter)

# Verify parameters are passed to parent constructor
self.assertEqual(custom_processor._max_export_batch_size, 100)
self.assertEqual(custom_processor._batch_processor._max_export_batch_size, 100)

@staticmethod
def generate_test_log_data(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def _test_unpatched_botocore_instrumentation(self):
self.assertTrue("sns" in _KNOWN_EXTENSIONS, "Upstream has removed the SNS extension")

# StepFunctions
self.assertFalse("stepfunctions" in _KNOWN_EXTENSIONS, "Upstream has added a StepFunctions extension")
self.assertTrue("stepfunctions" in _KNOWN_EXTENSIONS, "Upstream has removed the StepFunctions extension")

# Lambda
self.assertTrue("lambda" in _KNOWN_EXTENSIONS, "Upstream has removed the Lambda extension")
Expand Down Expand Up @@ -832,7 +832,7 @@ def _test_unpatched_starlette_instrumentation(self):
instrumentor = StarletteInstrumentor()
deps = original_deps(instrumentor)
# Default should have version constraint
self.assertEqual(deps, ("starlette >= 0.13, <0.15",))
self.assertEqual(deps, ("starlette >= 0.13",))
except ImportError:
# If starlette instrumentation is not installed, skip this test
pass
Expand Down
Loading