diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py index aaab17951..763ad4d85 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py @@ -118,7 +118,7 @@ def _configure(self, **kwargs): os.environ.setdefault( OTEL_PYTHON_DISABLED_INSTRUMENTATIONS, "http,sqlalchemy,psycopg2,pymysql,sqlite3,aiopg,asyncpg,mysql_connector," - "botocore,boto3,urllib3,requests,starlette", + "urllib3,requests,starlette,system_metrics,google-genai", ) # Set logging auto instrumentation default diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py index b7daac12b..13879d54f 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py @@ -11,6 +11,8 @@ from botocore.exceptions import ClientError from botocore.session import Session +from opentelemetry.instrumentation.utils import suppress_instrumentation + logger = logging.getLogger(__name__) @@ -118,27 +120,29 @@ def _generate_log_stream_name(self) -> str: def _create_log_group_if_needed(self): """Create log group if it doesn't exist.""" - try: - self.logs_client.create_log_group(logGroupName=self.log_group_name) - logger.info("Created log group: %s", self.log_group_name) - except ClientError as error: - if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException": - logger.debug("Log group %s already exists", self.log_group_name) - else: - logger.error("Failed to create log group %s : %s", self.log_group_name, error) - raise + with suppress_instrumentation(): + try: + self.logs_client.create_log_group(logGroupName=self.log_group_name) + logger.info("Created log group: %s", self.log_group_name) + except ClientError as error: + if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException": + logger.debug("Log group %s already exists", self.log_group_name) + else: + logger.error("Failed to create log group %s : %s", self.log_group_name, error) + raise def _create_log_stream_if_needed(self): """Create log stream if it doesn't exist.""" - try: - self.logs_client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name) - logger.info("Created log stream: %s", self.log_stream_name) - except ClientError as error: - if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException": - logger.debug("Log stream %s already exists", self.log_stream_name) - else: - logger.error("Failed to create log stream %s : %s", self.log_stream_name, error) - raise + with suppress_instrumentation(): + try: + self.logs_client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name) + logger.info("Created log stream: %s", self.log_stream_name) + except ClientError as error: + if error.response.get("Error", {}).get("Code") == "ResourceAlreadyExistsException": + logger.debug("Log stream %s already exists", self.log_stream_name) + else: + logger.error("Failed to create log stream %s : %s", self.log_stream_name, error) + raise def _validate_log_event(self, log_event: Dict) -> bool: """ @@ -277,52 +281,52 @@ def _send_log_batch(self, batch: LogEventBatch) -> None: } start_time = time.time() - - try: - # Make the PutLogEvents call - response = self.logs_client.put_log_events(**put_log_events_input) - - elapsed_ms = int((time.time() - start_time) * 1000) - logger.debug( - "Successfully sent %s log events (%s KB) in %s ms", - batch.size(), - batch.byte_total / 1024, - elapsed_ms, - ) - - return response - - except ClientError as error: - # Handle resource not found errors by creating log group/stream - error_code = error.response.get("Error", {}).get("Code") - if error_code == "ResourceNotFoundException": - logger.info("Log group or stream not found, creating resources and retrying") - - try: - # Create log group first - self._create_log_group_if_needed() - # Then create log stream - self._create_log_stream_if_needed() - - # Retry the PutLogEvents call - response = self.logs_client.put_log_events(**put_log_events_input) - - elapsed_ms = int((time.time() - start_time) * 1000) - logger.debug( - "Successfully sent %s log events (%s KB) in %s ms after creating resources", - batch.size(), - batch.byte_total / 1024, - elapsed_ms, - ) - - return response - - except ClientError as retry_error: - logger.error("Failed to send log events after creating resources: %s", retry_error) + with suppress_instrumentation(): + try: + # Make the PutLogEvents call + response = self.logs_client.put_log_events(**put_log_events_input) + + elapsed_ms = int((time.time() - start_time) * 1000) + logger.debug( + "Successfully sent %s log events (%s KB) in %s ms", + batch.size(), + batch.byte_total / 1024, + elapsed_ms, + ) + + return response + + except ClientError as error: + # Handle resource not found errors by creating log group/stream + error_code = error.response.get("Error", {}).get("Code") + if error_code == "ResourceNotFoundException": + logger.info("Log group or stream not found, creating resources and retrying") + with suppress_instrumentation(): + try: + # Create log group first + self._create_log_group_if_needed() + # Then create log stream + self._create_log_stream_if_needed() + + # Retry the PutLogEvents call + response = self.logs_client.put_log_events(**put_log_events_input) + + elapsed_ms = int((time.time() - start_time) * 1000) + logger.debug( + "Successfully sent %s log events (%s KB) in %s ms after creating resources", + batch.size(), + batch.byte_total / 1024, + elapsed_ms, + ) + + return response + + except ClientError as retry_error: + logger.error("Failed to send log events after creating resources: %s", retry_error) + raise + else: + logger.error("Failed to send log events: %s", error) raise - else: - logger.error("Failed to send log events: %s", error) - raise def send_log_event(self, log_event: Dict[str, Any]): """ diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_cloudwatch_log_client.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_cloudwatch_log_client.py index 2793aeb34..f0fcf565b 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_cloudwatch_log_client.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_cloudwatch_log_client.py @@ -5,7 +5,7 @@ import time import unittest -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch from botocore.exceptions import ClientError @@ -579,6 +579,134 @@ def test_is_batch_active_edge_cases(self): result = self.log_client._is_batch_active(batch2, current_time) self.assertFalse(result) + @patch("amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client.suppress_instrumentation") + def test_create_log_group_uses_suppress_instrumentation(self, mock_suppress): + """Test that _create_log_group_if_needed uses suppress_instrumentation.""" + # Configure the mock context manager + mock_context = MagicMock() + mock_suppress.return_value = mock_context + mock_context.__enter__.return_value = mock_context + mock_context.__exit__.return_value = None + + # Call the method + self.log_client._create_log_group_if_needed() + + # Verify suppress_instrumentation was called + mock_suppress.assert_called_once() + mock_context.__enter__.assert_called_once() + mock_context.__exit__.assert_called_once() + + # Verify the AWS call happened within the context + self.log_client.logs_client.create_log_group.assert_called_once_with(logGroupName="test-log-group") + + @patch("amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client.suppress_instrumentation") + def test_create_log_stream_uses_suppress_instrumentation(self, mock_suppress): + """Test that _create_log_stream_if_needed uses suppress_instrumentation.""" + # Configure the mock context manager + mock_context = MagicMock() + mock_suppress.return_value = mock_context + mock_context.__enter__.return_value = mock_context + mock_context.__exit__.return_value = None + + # Call the method + self.log_client._create_log_stream_if_needed() + + # Verify suppress_instrumentation was called + mock_suppress.assert_called_once() + mock_context.__enter__.assert_called_once() + mock_context.__exit__.assert_called_once() + + # Verify the AWS call happened within the context + self.log_client.logs_client.create_log_stream.assert_called_once() + + @patch("amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client.suppress_instrumentation") + def test_send_log_batch_uses_suppress_instrumentation(self, mock_suppress): + """Test that _send_log_batch uses suppress_instrumentation.""" + # Configure the mock context manager + mock_context = MagicMock() + mock_suppress.return_value = mock_context + mock_context.__enter__.return_value = mock_context + mock_context.__exit__.return_value = None + + # Create a batch with events + batch = self.log_client._create_event_batch() + batch.add_event({"message": "test", "timestamp": int(time.time() * 1000)}, 10) + + # Mock successful put_log_events + self.log_client.logs_client.put_log_events.return_value = {"nextSequenceToken": "12345"} + + # Call the method + self.log_client._send_log_batch(batch) + + # Verify suppress_instrumentation was called + mock_suppress.assert_called_once() + mock_context.__enter__.assert_called_once() + mock_context.__exit__.assert_called_once() + + # Verify the AWS call happened within the context + self.log_client.logs_client.put_log_events.assert_called_once() + + @patch("amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client.suppress_instrumentation") + def test_send_log_batch_retry_uses_suppress_instrumentation(self, mock_suppress): + """Test that _send_log_batch retry logic also uses suppress_instrumentation.""" + # Configure the mock context manager + mock_context = MagicMock() + mock_suppress.return_value = mock_context + mock_context.__enter__.return_value = mock_context + mock_context.__exit__.return_value = None + + # Create a batch with events + batch = self.log_client._create_event_batch() + batch.add_event({"message": "test", "timestamp": int(time.time() * 1000)}, 10) + + # Mock put_log_events to fail first with ResourceNotFoundException, then succeed + self.log_client.logs_client.put_log_events.side_effect = [ + ClientError({"Error": {"Code": "ResourceNotFoundException"}}, "PutLogEvents"), + {"nextSequenceToken": "12345"}, + ] + + # Call the method + self.log_client._send_log_batch(batch) + + # Verify suppress_instrumentation was called: + # 1. Initial _send_log_batch context + # 2. Nested context in the retry block + # 3. _create_log_group_if_needed context + # 4. _create_log_stream_if_needed context + self.assertEqual(mock_suppress.call_count, 4) + # Each context should have been properly entered and exited + self.assertEqual(mock_context.__enter__.call_count, 4) + self.assertEqual(mock_context.__exit__.call_count, 4) + + # Verify AWS calls happened + self.assertEqual(self.log_client.logs_client.put_log_events.call_count, 2) + self.log_client.logs_client.create_log_group.assert_called_once() + self.log_client.logs_client.create_log_stream.assert_called_once() + + @patch("amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client.suppress_instrumentation") + def test_create_log_group_exception_still_uses_suppress_instrumentation(self, mock_suppress): + """Test that suppress_instrumentation is properly used even when exceptions occur.""" + # Configure the mock context manager + mock_context = MagicMock() + mock_suppress.return_value = mock_context + mock_context.__enter__.return_value = mock_context + mock_context.__exit__.return_value = None + + # Make create_log_group raise an exception + self.log_client.logs_client.create_log_group.side_effect = ClientError( + {"Error": {"Code": "AccessDenied"}}, "CreateLogGroup" + ) + + # Call should raise the exception + with self.assertRaises(ClientError): + self.log_client._create_log_group_if_needed() + + # Verify suppress_instrumentation was still properly used + mock_suppress.assert_called_once() + mock_context.__enter__.assert_called_once() + # __exit__ should be called even though an exception was raised + mock_context.__exit__.assert_called_once() + if __name__ == "__main__": unittest.main() diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelemetry_distro.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelemetry_distro.py index 3b20e8308..5a91c4a83 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelemetry_distro.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelemetry_distro.py @@ -127,7 +127,7 @@ def test_configure_with_agent_observability_enabled( self.assertEqual( os.environ.get("OTEL_PYTHON_DISABLED_INSTRUMENTATIONS"), "http,sqlalchemy,psycopg2,pymysql,sqlite3,aiopg,asyncpg,mysql_connector," - "botocore,boto3,urllib3,requests,starlette", + "urllib3,requests,starlette,system_metrics,google-genai", ) self.assertEqual(os.environ.get("OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED"), "true") self.assertEqual(os.environ.get("OTEL_AWS_APPLICATION_SIGNALS_ENABLED"), "false")