Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _configure(self, **kwargs):
os.environ.setdefault(
OTEL_PYTHON_DISABLED_INSTRUMENTATIONS,
"http,sqlalchemy,psycopg2,pymysql,sqlite3,aiopg,asyncpg,mysql_connector,"
"botocore,boto3,urllib3,requests,starlette",
"urllib3,requests,starlette,system_metrics,google-genai",
)

# Set logging auto instrumentation default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from botocore.exceptions import ClientError
from botocore.session import Session

from opentelemetry.instrumentation.utils import suppress_instrumentation

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -118,27 +120,29 @@ def _generate_log_stream_name(self) -> str:

def _create_log_group_if_needed(self):
"""Create log group if it doesn't exist."""
try:
self.logs_client.create_log_group(logGroupName=self.log_group_name)
logger.info("Created log group: %s", self.log_group_name)
except ClientError as error:
if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException":
logger.debug("Log group %s already exists", self.log_group_name)
else:
logger.error("Failed to create log group %s : %s", self.log_group_name, error)
raise
with suppress_instrumentation():
try:
self.logs_client.create_log_group(logGroupName=self.log_group_name)
logger.info("Created log group: %s", self.log_group_name)
except ClientError as error:
if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException":
logger.debug("Log group %s already exists", self.log_group_name)
else:
logger.error("Failed to create log group %s : %s", self.log_group_name, error)
raise

def _create_log_stream_if_needed(self):
"""Create log stream if it doesn't exist."""
try:
self.logs_client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name)
logger.info("Created log stream: %s", self.log_stream_name)
except ClientError as error:
if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException":
logger.debug("Log stream %s already exists", self.log_stream_name)
else:
logger.error("Failed to create log stream %s : %s", self.log_stream_name, error)
raise
with suppress_instrumentation():
try:
self.logs_client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name)
logger.info("Created log stream: %s", self.log_stream_name)
except ClientError as error:
if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException":
logger.debug("Log stream %s already exists", self.log_stream_name)
else:
logger.error("Failed to create log stream %s : %s", self.log_stream_name, error)
raise

def _validate_log_event(self, log_event: Dict) -> bool:
"""
Expand Down Expand Up @@ -277,52 +281,52 @@ def _send_log_batch(self, batch: LogEventBatch) -> None:
}

start_time = time.time()

try:
# Make the PutLogEvents call
response = self.logs_client.put_log_events(**put_log_events_input)

elapsed_ms = int((time.time() - start_time) * 1000)
logger.debug(
"Successfully sent %s log events (%s KB) in %s ms",
batch.size(),
batch.byte_total / 1024,
elapsed_ms,
)

return response

except ClientError as error:
# Handle resource not found errors by creating log group/stream
error_code = error.response.get("Error", {}).get("Code")
if error_code == "ResourceNotFoundException":
logger.info("Log group or stream not found, creating resources and retrying")

try:
# Create log group first
self._create_log_group_if_needed()
# Then create log stream
self._create_log_stream_if_needed()

# Retry the PutLogEvents call
response = self.logs_client.put_log_events(**put_log_events_input)

elapsed_ms = int((time.time() - start_time) * 1000)
logger.debug(
"Successfully sent %s log events (%s KB) in %s ms after creating resources",
batch.size(),
batch.byte_total / 1024,
elapsed_ms,
)

return response

except ClientError as retry_error:
logger.error("Failed to send log events after creating resources: %s", retry_error)
with suppress_instrumentation():
try:
# Make the PutLogEvents call
response = self.logs_client.put_log_events(**put_log_events_input)

elapsed_ms = int((time.time() - start_time) * 1000)
logger.debug(
"Successfully sent %s log events (%s KB) in %s ms",
batch.size(),
batch.byte_total / 1024,
elapsed_ms,
)

return response

except ClientError as error:
# Handle resource not found errors by creating log group/stream
error_code = error.response.get("Error", {}).get("Code")
if error_code == "ResourceNotFoundException":
logger.info("Log group or stream not found, creating resources and retrying")
with suppress_instrumentation():
try:
# Create log group first
self._create_log_group_if_needed()
# Then create log stream
self._create_log_stream_if_needed()

# Retry the PutLogEvents call
response = self.logs_client.put_log_events(**put_log_events_input)

elapsed_ms = int((time.time() - start_time) * 1000)
logger.debug(
"Successfully sent %s log events (%s KB) in %s ms after creating resources",
batch.size(),
batch.byte_total / 1024,
elapsed_ms,
)

return response

except ClientError as retry_error:
logger.error("Failed to send log events after creating resources: %s", retry_error)
raise
else:
logger.error("Failed to send log events: %s", error)
raise
else:
logger.error("Failed to send log events: %s", error)
raise

def send_log_event(self, log_event: Dict[str, Any]):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import time
import unittest
from unittest.mock import Mock, patch
from unittest.mock import MagicMock, Mock, patch

from botocore.exceptions import ClientError

Expand Down Expand Up @@ -579,6 +579,134 @@ def test_is_batch_active_edge_cases(self):
result = self.log_client._is_batch_active(batch2, current_time)
self.assertFalse(result)

@patch("amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client.suppress_instrumentation")
def test_create_log_group_uses_suppress_instrumentation(self, mock_suppress):
"""Test that _create_log_group_if_needed uses suppress_instrumentation."""
# Configure the mock context manager
mock_context = MagicMock()
mock_suppress.return_value = mock_context
mock_context.__enter__.return_value = mock_context
mock_context.__exit__.return_value = None

# Call the method
self.log_client._create_log_group_if_needed()

# Verify suppress_instrumentation was called
mock_suppress.assert_called_once()
mock_context.__enter__.assert_called_once()
mock_context.__exit__.assert_called_once()

# Verify the AWS call happened within the context
self.log_client.logs_client.create_log_group.assert_called_once_with(logGroupName="test-log-group")

@patch("amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client.suppress_instrumentation")
def test_create_log_stream_uses_suppress_instrumentation(self, mock_suppress):
"""Test that _create_log_stream_if_needed uses suppress_instrumentation."""
# Configure the mock context manager
mock_context = MagicMock()
mock_suppress.return_value = mock_context
mock_context.__enter__.return_value = mock_context
mock_context.__exit__.return_value = None

# Call the method
self.log_client._create_log_stream_if_needed()

# Verify suppress_instrumentation was called
mock_suppress.assert_called_once()
mock_context.__enter__.assert_called_once()
mock_context.__exit__.assert_called_once()

# Verify the AWS call happened within the context
self.log_client.logs_client.create_log_stream.assert_called_once()

@patch("amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client.suppress_instrumentation")
def test_send_log_batch_uses_suppress_instrumentation(self, mock_suppress):
"""Test that _send_log_batch uses suppress_instrumentation."""
# Configure the mock context manager
mock_context = MagicMock()
mock_suppress.return_value = mock_context
mock_context.__enter__.return_value = mock_context
mock_context.__exit__.return_value = None

# Create a batch with events
batch = self.log_client._create_event_batch()
batch.add_event({"message": "test", "timestamp": int(time.time() * 1000)}, 10)

# Mock successful put_log_events
self.log_client.logs_client.put_log_events.return_value = {"nextSequenceToken": "12345"}

# Call the method
self.log_client._send_log_batch(batch)

# Verify suppress_instrumentation was called
mock_suppress.assert_called_once()
mock_context.__enter__.assert_called_once()
mock_context.__exit__.assert_called_once()

# Verify the AWS call happened within the context
self.log_client.logs_client.put_log_events.assert_called_once()

@patch("amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client.suppress_instrumentation")
def test_send_log_batch_retry_uses_suppress_instrumentation(self, mock_suppress):
"""Test that _send_log_batch retry logic also uses suppress_instrumentation."""
# Configure the mock context manager
mock_context = MagicMock()
mock_suppress.return_value = mock_context
mock_context.__enter__.return_value = mock_context
mock_context.__exit__.return_value = None

# Create a batch with events
batch = self.log_client._create_event_batch()
batch.add_event({"message": "test", "timestamp": int(time.time() * 1000)}, 10)

# Mock put_log_events to fail first with ResourceNotFoundException, then succeed
self.log_client.logs_client.put_log_events.side_effect = [
ClientError({"Error": {"Code": "ResourceNotFoundException"}}, "PutLogEvents"),
{"nextSequenceToken": "12345"},
]

# Call the method
self.log_client._send_log_batch(batch)

# Verify suppress_instrumentation was called:
# 1. Initial _send_log_batch context
# 2. Nested context in the retry block
# 3. _create_log_group_if_needed context
# 4. _create_log_stream_if_needed context
self.assertEqual(mock_suppress.call_count, 4)
# Each context should have been properly entered and exited
self.assertEqual(mock_context.__enter__.call_count, 4)
self.assertEqual(mock_context.__exit__.call_count, 4)

# Verify AWS calls happened
self.assertEqual(self.log_client.logs_client.put_log_events.call_count, 2)
self.log_client.logs_client.create_log_group.assert_called_once()
self.log_client.logs_client.create_log_stream.assert_called_once()

@patch("amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client.suppress_instrumentation")
def test_create_log_group_exception_still_uses_suppress_instrumentation(self, mock_suppress):
"""Test that suppress_instrumentation is properly used even when exceptions occur."""
# Configure the mock context manager
mock_context = MagicMock()
mock_suppress.return_value = mock_context
mock_context.__enter__.return_value = mock_context
mock_context.__exit__.return_value = None

# Make create_log_group raise an exception
self.log_client.logs_client.create_log_group.side_effect = ClientError(
{"Error": {"Code": "AccessDenied"}}, "CreateLogGroup"
)

# Call should raise the exception
with self.assertRaises(ClientError):
self.log_client._create_log_group_if_needed()

# Verify suppress_instrumentation was still properly used
mock_suppress.assert_called_once()
mock_context.__enter__.assert_called_once()
# __exit__ should be called even though an exception was raised
mock_context.__exit__.assert_called_once()


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def test_configure_with_agent_observability_enabled(
self.assertEqual(
os.environ.get("OTEL_PYTHON_DISABLED_INSTRUMENTATIONS"),
"http,sqlalchemy,psycopg2,pymysql,sqlite3,aiopg,asyncpg,mysql_connector,"
"botocore,boto3,urllib3,requests,starlette",
"urllib3,requests,starlette,system_metrics,google-genai",
)
self.assertEqual(os.environ.get("OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED"), "true")
self.assertEqual(os.environ.get("OTEL_AWS_APPLICATION_SIGNALS_ENABLED"), "false")
Expand Down
Loading