Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t
([#501](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/501))
- Fix Gevent patch regression with correct import order
([#522](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/522))
- Bump ADOT Python version to 0.13.0 and OTel dependencies to 1.37.0/0.58b0
([#524](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/524))
110 changes: 55 additions & 55 deletions aws-opentelemetry-distro/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,63 +26,63 @@ classifiers = [
]

dependencies = [
"opentelemetry-api == 1.33.1",
"opentelemetry-sdk == 1.33.1",
"opentelemetry-exporter-otlp-proto-grpc == 1.33.1",
"opentelemetry-exporter-otlp-proto-http == 1.33.1",
"opentelemetry-propagator-b3 == 1.33.1",
"opentelemetry-propagator-jaeger == 1.33.1",
"opentelemetry-exporter-otlp-proto-common == 1.33.1",
"opentelemetry-api == 1.37.0",
"opentelemetry-sdk == 1.37.0",
"opentelemetry-exporter-otlp-proto-grpc == 1.37.0",
"opentelemetry-exporter-otlp-proto-http == 1.37.0",
"opentelemetry-propagator-b3 == 1.37.0",
"opentelemetry-propagator-jaeger == 1.37.0",
"opentelemetry-exporter-otlp-proto-common == 1.37.0",
"opentelemetry-sdk-extension-aws == 2.0.2",
"opentelemetry-propagator-aws-xray == 1.0.1",
"opentelemetry-distro == 0.54b1",
"opentelemetry-processor-baggage == 0.54b1",
"opentelemetry-propagator-ot-trace == 0.54b1",
"opentelemetry-instrumentation == 0.54b1",
"opentelemetry-instrumentation-aws-lambda == 0.54b1",
"opentelemetry-instrumentation-aio-pika == 0.54b1",
"opentelemetry-instrumentation-aiohttp-client == 0.54b1",
"opentelemetry-instrumentation-aiokafka == 0.54b1",
"opentelemetry-instrumentation-aiopg == 0.54b1",
"opentelemetry-instrumentation-asgi == 0.54b1",
"opentelemetry-instrumentation-asyncpg == 0.54b1",
"opentelemetry-instrumentation-boto == 0.54b1",
"opentelemetry-instrumentation-boto3sqs == 0.54b1",
"opentelemetry-instrumentation-botocore == 0.54b1",
"opentelemetry-instrumentation-celery == 0.54b1",
"opentelemetry-instrumentation-confluent-kafka == 0.54b1",
"opentelemetry-instrumentation-dbapi == 0.54b1",
"opentelemetry-instrumentation-django == 0.54b1",
"opentelemetry-instrumentation-elasticsearch == 0.54b1",
"opentelemetry-instrumentation-falcon == 0.54b1",
"opentelemetry-instrumentation-fastapi == 0.54b1",
"opentelemetry-instrumentation-flask == 0.54b1",
"opentelemetry-instrumentation-grpc == 0.54b1",
"opentelemetry-instrumentation-httpx == 0.54b1",
"opentelemetry-instrumentation-jinja2 == 0.54b1",
"opentelemetry-instrumentation-kafka-python == 0.54b1",
"opentelemetry-instrumentation-logging == 0.54b1",
"opentelemetry-instrumentation-mysql == 0.54b1",
"opentelemetry-instrumentation-mysqlclient == 0.54b1",
"opentelemetry-instrumentation-pika == 0.54b1",
"opentelemetry-instrumentation-psycopg2 == 0.54b1",
"opentelemetry-instrumentation-pymemcache == 0.54b1",
"opentelemetry-instrumentation-pymongo == 0.54b1",
"opentelemetry-instrumentation-pymysql == 0.54b1",
"opentelemetry-instrumentation-pyramid == 0.54b1",
"opentelemetry-instrumentation-redis == 0.54b1",
"opentelemetry-instrumentation-remoulade == 0.54b1",
"opentelemetry-instrumentation-requests == 0.54b1",
"opentelemetry-instrumentation-sqlalchemy == 0.54b1",
"opentelemetry-instrumentation-sqlite3 == 0.54b1",
"opentelemetry-instrumentation-starlette == 0.54b1",
"opentelemetry-instrumentation-system-metrics == 0.54b1",
"opentelemetry-instrumentation-tornado == 0.54b1",
"opentelemetry-instrumentation-tortoiseorm == 0.54b1",
"opentelemetry-instrumentation-urllib == 0.54b1",
"opentelemetry-instrumentation-urllib3 == 0.54b1",
"opentelemetry-instrumentation-wsgi == 0.54b1",
"opentelemetry-instrumentation-cassandra == 0.54b1",
"opentelemetry-distro == 0.58b0",
"opentelemetry-processor-baggage == 0.58b0",
"opentelemetry-propagator-ot-trace == 0.58b0",
"opentelemetry-instrumentation == 0.58b0",
"opentelemetry-instrumentation-aws-lambda == 0.58b0",
"opentelemetry-instrumentation-aio-pika == 0.58b0",
"opentelemetry-instrumentation-aiohttp-client == 0.58b0",
"opentelemetry-instrumentation-aiokafka == 0.58b0",
"opentelemetry-instrumentation-aiopg == 0.58b0",
"opentelemetry-instrumentation-asgi == 0.58b0",
"opentelemetry-instrumentation-asyncpg == 0.58b0",
"opentelemetry-instrumentation-boto == 0.58b0",
"opentelemetry-instrumentation-boto3sqs == 0.58b0",
"opentelemetry-instrumentation-botocore == 0.58b0",
"opentelemetry-instrumentation-celery == 0.58b0",
"opentelemetry-instrumentation-confluent-kafka == 0.58b0",
"opentelemetry-instrumentation-dbapi == 0.58b0",
"opentelemetry-instrumentation-django == 0.58b0",
"opentelemetry-instrumentation-elasticsearch == 0.58b0",
"opentelemetry-instrumentation-falcon == 0.58b0",
"opentelemetry-instrumentation-fastapi == 0.58b0",
"opentelemetry-instrumentation-flask == 0.58b0",
"opentelemetry-instrumentation-grpc == 0.58b0",
"opentelemetry-instrumentation-httpx == 0.58b0",
"opentelemetry-instrumentation-jinja2 == 0.58b0",
"opentelemetry-instrumentation-kafka-python == 0.58b0",
"opentelemetry-instrumentation-logging == 0.58b0",
"opentelemetry-instrumentation-mysql == 0.58b0",
"opentelemetry-instrumentation-mysqlclient == 0.58b0",
"opentelemetry-instrumentation-pika == 0.58b0",
"opentelemetry-instrumentation-psycopg2 == 0.58b0",
"opentelemetry-instrumentation-pymemcache == 0.58b0",
"opentelemetry-instrumentation-pymongo == 0.58b0",
"opentelemetry-instrumentation-pymysql == 0.58b0",
"opentelemetry-instrumentation-pyramid == 0.58b0",
"opentelemetry-instrumentation-redis == 0.58b0",
"opentelemetry-instrumentation-remoulade == 0.58b0",
"opentelemetry-instrumentation-requests == 0.58b0",
"opentelemetry-instrumentation-sqlalchemy == 0.58b0",
"opentelemetry-instrumentation-sqlite3 == 0.58b0",
"opentelemetry-instrumentation-starlette == 0.58b0",
"opentelemetry-instrumentation-system-metrics == 0.58b0",
"opentelemetry-instrumentation-tornado == 0.58b0",
"opentelemetry-instrumentation-tortoiseorm == 0.58b0",
"opentelemetry-instrumentation-urllib == 0.58b0",
"opentelemetry-instrumentation-urllib3 == 0.58b0",
"opentelemetry-instrumentation-wsgi == 0.58b0",
"opentelemetry-instrumentation-cassandra == 0.58b0",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@


class BatchUnsampledSpanProcessor(BaseBatchSpanProcessor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._spans_dropped = False

# pylint: disable=no-self-use
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
Expand All @@ -22,19 +25,18 @@ def on_end(self, span: ReadableSpan) -> None:
if span.context.trace_flags.sampled:
return

if self.done:
if self._batch_processor._shutdown:
logger.warning("Already shutdown, dropping span.")
return

if len(self.queue) == self.max_queue_size:
if len(self._batch_processor._queue) == self._batch_processor._max_queue_size:
# pylint: disable=access-member-before-definition
if not self._spans_dropped:
logger.warning("Queue is full, likely spans will be dropped.")
# pylint: disable=attribute-defined-outside-init
self._spans_dropped = True

self.queue.appendleft(span)
self._batch_processor._queue.appendleft(span)

if len(self.queue) >= self.max_export_batch_size:
with self.condition:
self.condition.notify()
if len(self._batch_processor._queue) >= self._batch_processor._max_export_batch_size:
self._batch_processor._worker_awaken.set()
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
_import_id_generator,
_import_sampler,
_OTelSDKConfigurator,
_patch_basic_config,
_overwrite_logging_config_fns,
)
from opentelemetry.sdk._events import EventLoggerProvider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
Expand Down Expand Up @@ -230,11 +230,10 @@ def _init_logging(
set_event_logger_provider(event_logger_provider)

if setup_logging_handler:
_patch_basic_config()

# Add OTel handler
handler = LoggingHandler(level=logging.NOTSET, logger_provider=provider)
logging.getLogger().addHandler(handler)
_overwrite_logging_config_fns(handler)


def _init_tracing(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk._shared_internal import BatchExportStrategy
from opentelemetry.util.types import AnyValue

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -89,7 +89,7 @@ def __init__(

self._exporter = exporter

def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
def _export(self, batch_strategy: BatchExportStrategy) -> None:
"""
Explicitly overrides upstream _export method to add AWS CloudWatch size-based batching
See:
Expand All @@ -102,20 +102,20 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
- Estimated data size of exported batches will typically be <= 1 MB except for the case below:
- If the estimated data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
"""
with self._export_lock:
with self._batch_processor._export_lock:
iteration = 0
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
# once the lock is obtained to see if we still need to make the requested export.
while self._should_export_batch(batch_strategy, iteration):
while self._batch_processor._should_export_batch(batch_strategy, iteration):
iteration += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
batch_length = min(self._max_export_batch_size, len(self._queue))
batch_length = min(self._batch_processor._max_export_batch_size, len(self._batch_processor._queue))
batch_data_size = 0
batch = []

for _ in range(batch_length):
log_data: LogData = self._queue.pop()
log_data: LogData = self._batch_processor._queue.pop()
log_size = self._estimate_log_size(log_data)

if batch and (batch_data_size + log_size > self._MAX_LOG_REQUEST_BYTE_SIZE):
Expand Down Expand Up @@ -252,7 +252,7 @@ def _estimate_utf8_size(s: str):
# https://github.com/open-telemetry/opentelemetry-python/issues/3193
# https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L199
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
if self._shutdown:
if self._batch_processor._shutdown:
return False
self._export(BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH)
self._export(BatchExportStrategy.EXPORT_AT_LEAST_ONE_BATCH)
return True
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._common import _is_retryable
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs.export import LogExportResult
Expand Down Expand Up @@ -171,7 +172,7 @@ def _retryable(resp: Response) -> bool:
"""
# See: https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling

return resp.status_code in (429, 503) or OTLPLogExporter._retryable(resp)
return resp.status_code in (429, 503) or _is_retryable(resp)

def _get_retry_delay_sec(self, headers: CaseInsensitiveDict, retry_num: int) -> float:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

__version__ = "0.12.1.dev0"
__version__ = "0.13.0.dev0"
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import (
AwsCloudWatchOtlpBatchLogRecordProcessor,
BatchLogExportStrategy,
)
from opentelemetry._logs.severity import SeverityNumber
from opentelemetry.sdk._logs import LogData, LogRecord
from opentelemetry.sdk._logs.export import LogExportResult
from opentelemetry.sdk._shared_internal import BatchExportStrategy
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.trace import TraceFlags
from opentelemetry.util.types import AnyValue
Expand Down Expand Up @@ -151,14 +151,14 @@ def test_export_single_batch_under_size_limit(self, _, __, ___):
for log in test_logs:
size = self.processor._estimate_log_size(log)
total_data_size += size
self.processor._queue.appendleft(log)
self.processor._batch_processor._queue.appendleft(log)

self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL)
self.processor._export(batch_strategy=BatchExportStrategy.EXPORT_ALL)
args, _ = self.mock_exporter.export.call_args
actual_batch = args[0]

self.assertLess(total_data_size, self.processor._MAX_LOG_REQUEST_BYTE_SIZE)
self.assertEqual(len(self.processor._queue), 0)
self.assertEqual(len(self.processor._batch_processor._queue), 0)
self.assertEqual(len(actual_batch), log_count)
self.mock_exporter.export.assert_called_once()

Expand All @@ -175,11 +175,11 @@ def test_export_single_batch_all_logs_over_size_limit(self, _, __, ___):
test_logs = self.generate_test_log_data(log_body=large_log_body, count=15)

for log in test_logs:
self.processor._queue.appendleft(log)
self.processor._batch_processor._queue.appendleft(log)

self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL)
self.processor._export(batch_strategy=BatchExportStrategy.EXPORT_ALL)

self.assertEqual(len(self.processor._queue), 0)
self.assertEqual(len(self.processor._batch_processor._queue), 0)
self.assertEqual(self.mock_exporter.export.call_count, len(test_logs))

batches = self.mock_exporter.export.call_args_list
Expand Down Expand Up @@ -209,11 +209,11 @@ def test_export_single_batch_some_logs_over_size_limit(self, _, __, ___):
test_logs = large_logs + small_logs

for log in test_logs:
self.processor._queue.appendleft(log)
self.processor._batch_processor._queue.appendleft(log)

self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL)
self.processor._export(batch_strategy=BatchExportStrategy.EXPORT_ALL)

self.assertEqual(len(self.processor._queue), 0)
self.assertEqual(len(self.processor._batch_processor._queue), 0)
self.assertEqual(self.mock_exporter.export.call_count, 5)

batches = self.mock_exporter.export.call_args_list
Expand Down Expand Up @@ -249,22 +249,22 @@ def test_force_flush_returns_false_when_shutdown(self):
def test_force_flush_exports_only_one_batch(self, _, __, ___):
"""Tests that force_flush should try to at least export one batch of logs. Rest of the logs will be dropped"""
# Set max_export_batch_size to 5 to limit batch size
self.processor._max_export_batch_size = 5
self.processor._shutdown = False
self.processor._batch_processor._max_export_batch_size = 5
self.processor._batch_processor._shutdown = False

# Add 6 logs to queue, after the export there should be 1 log remaining
log_count = 6
test_logs = self.generate_test_log_data(log_body="test message", count=log_count)

for log in test_logs:
self.processor._queue.appendleft(log)
self.processor._batch_processor._queue.appendleft(log)

self.assertEqual(len(self.processor._queue), log_count)
self.assertEqual(len(self.processor._batch_processor._queue), log_count)

result = self.processor.force_flush()

self.assertTrue(result)
self.assertEqual(len(self.processor._queue), 1)
self.assertEqual(len(self.processor._batch_processor._queue), 1)
self.mock_exporter.export.assert_called_once()

# Verify only one batch of 5 logs was exported
Expand All @@ -287,18 +287,18 @@ def test_export_handles_exception_gracefully(self, mock_logger, _, __, ___):
# Add logs to queue
test_logs = self.generate_test_log_data(log_body="test message", count=2)
for log in test_logs:
self.processor._queue.appendleft(log)
self.processor._batch_processor._queue.appendleft(log)

# Call _export - should not raise exception
self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL)
self.processor._export(batch_strategy=BatchExportStrategy.EXPORT_ALL)

# Verify exception was logged
mock_logger.exception.assert_called_once()
call_args = mock_logger.exception.call_args[0]
self.assertIn("Exception while exporting logs:", call_args[0])

# Queue should be empty even though export failed
self.assertEqual(len(self.processor._queue), 0)
self.assertEqual(len(self.processor._batch_processor._queue), 0)

@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor._logger")
def test_estimate_log_size_debug_logging_on_depth_exceeded(self, mock_logger):
Expand Down Expand Up @@ -350,7 +350,7 @@ def test_constructor_with_custom_parameters(self):
self.assertEqual(custom_processor._exporter, self.mock_exporter)

# Verify parameters are passed to parent constructor
self.assertEqual(custom_processor._max_export_batch_size, 100)
self.assertEqual(custom_processor._batch_processor._max_export_batch_size, 100)

@staticmethod
def generate_test_log_data(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def _test_unpatched_botocore_instrumentation(self):
self.assertTrue("sns" in _KNOWN_EXTENSIONS, "Upstream has removed the SNS extension")

# StepFunctions
self.assertFalse("stepfunctions" in _KNOWN_EXTENSIONS, "Upstream has added a StepFunctions extension")
self.assertTrue("stepfunctions" in _KNOWN_EXTENSIONS, "Upstream has removed the StepFunctions extension")

# Lambda
self.assertTrue("lambda" in _KNOWN_EXTENSIONS, "Upstream has removed the Lambda extension")
Expand Down Expand Up @@ -832,7 +832,7 @@ def _test_unpatched_starlette_instrumentation(self):
instrumentor = StarletteInstrumentor()
deps = original_deps(instrumentor)
# Default should have version constraint
self.assertEqual(deps, ("starlette >= 0.13, <0.15",))
self.assertEqual(deps, ("starlette >= 0.13",))
except ImportError:
# If starlette instrumentation is not installed, skip this test
pass
Expand Down
Loading