Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,18 @@ def _send_log_batch(self, batch: LogEventBatch) -> None:
logger.info("Log group or stream not found, creating resources and retrying")
with suppress_instrumentation():
try:
# Create log group first
self._create_log_group_if_needed()
# Then create log stream
self._create_log_stream_if_needed()
# Try creating the log stream first — the log group
# may already exist (e.g. managed by IaC)
try:
self._create_log_stream_if_needed()
except ClientError as stream_error:
stream_error_code = stream_error.response.get("Error", {}).get("Code")
if stream_error_code == "ResourceNotFoundException":
# Log group doesn't exist either, create both
self._create_log_group_if_needed()
self._create_log_stream_if_needed()
else:
raise

# Retry the PutLogEvents call
response = self.logs_client.put_log_events(**put_log_events_input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,11 @@ def test_send_log_event_method_exists(self):
self.fail(f"send_log_event raised an exception: {error}")

def test_send_log_batch_with_resource_not_found(self):
"""Test lazy creation when put_log_events fails with ResourceNotFoundException."""
"""Test lazy creation when put_log_events fails with ResourceNotFoundException.

The recovery path tries creating the log stream first. If that succeeds
(log group already exists), log group creation is skipped entirely.
"""
batch = self.log_client._create_event_batch()
batch.add_event({"message": "test message", "timestamp": int(time.time() * 1000)}, 10)

Expand All @@ -283,7 +287,7 @@ def test_send_log_batch_with_resource_not_found(self):
{"nextSequenceToken": "12345"},
]

# Mock the create methods
# Mock the create methods — stream creation succeeds (group exists)
mock_create_group = Mock()
mock_create_stream = Mock()
self.log_client._create_log_group_if_needed = mock_create_group
Expand All @@ -292,13 +296,47 @@ def test_send_log_batch_with_resource_not_found(self):
# Should not raise an exception and should create resources
self.log_client._send_log_batch(batch)

# Verify that creation methods were called
mock_create_group.assert_called_once()
# Stream creation attempted first; group creation NOT needed
mock_create_group.assert_not_called()
mock_create_stream.assert_called_once()

# Verify put_log_events was called twice (initial attempt + retry)
self.assertEqual(mock_put.call_count, 2)

def test_send_log_batch_with_resource_not_found_creates_group_and_stream(self):
"""Test that both log group and stream are created when neither exists.

When CreateLogStream fails with ResourceNotFoundException, it means the
log group doesn't exist either, so both are created.
"""
batch = self.log_client._create_event_batch()
batch.add_event({"message": "test message", "timestamp": int(time.time() * 1000)}, 10)

# Mock put_log_events to fail first, then succeed
mock_put = self.log_client.logs_client.put_log_events
mock_put.side_effect = [
ClientError({"Error": {"Code": "ResourceNotFoundException"}}, "PutLogEvents"),
{"nextSequenceToken": "12345"},
]

# First stream creation fails (no group), second succeeds
mock_create_stream = Mock(
side_effect=[
ClientError({"Error": {"Code": "ResourceNotFoundException"}}, "CreateLogStream"),
None,
]
)
mock_create_group = Mock()
self.log_client._create_log_group_if_needed = mock_create_group
self.log_client._create_log_stream_if_needed = mock_create_stream

self.log_client._send_log_batch(batch)

# Both should be called: stream (fail), group, stream (succeed)
mock_create_group.assert_called_once()
self.assertEqual(mock_create_stream.call_count, 2)
self.assertEqual(mock_put.call_count, 2)

def test_send_log_batch_with_other_error(self):
"""Test that non-ResourceNotFoundException errors are re-raised."""
batch = self.log_client._create_event_batch()
Expand All @@ -313,6 +351,37 @@ def test_send_log_batch_with_other_error(self):
with self.assertRaises(ClientError):
self.log_client._send_log_batch(batch)

def test_send_log_batch_recovers_when_log_group_managed_externally(self):
"""Test that send_log_batch succeeds when the log group exists but
CreateLogGroup would be denied (IaC-managed log group scenario).

Since the stream is created first and the group already exists,
CreateLogGroup is never called at all.
"""
batch = self.log_client._create_event_batch()
batch.add_event({"message": "test message", "timestamp": int(time.time() * 1000)}, 10)

# First put_log_events fails (stream doesn't exist yet), retry succeeds
self.log_client.logs_client.put_log_events.side_effect = [
ClientError({"Error": {"Code": "ResourceNotFoundException"}}, "PutLogEvents"),
{"nextSequenceToken": "12345"},
]

# CreateLogGroup would be denied — but it should never be called
self.log_client.logs_client.create_log_group.side_effect = ClientError(
{"Error": {"Code": "AccessDeniedException"}}, "CreateLogGroup"
)

# Should NOT raise — stream creation succeeds, group creation skipped
self.log_client._send_log_batch(batch)

# CreateLogGroup was never attempted
self.log_client.logs_client.create_log_group.assert_not_called()
# CreateLogStream was called
self.log_client.logs_client.create_log_stream.assert_called_once()
# put_log_events was retried
self.assertEqual(self.log_client.logs_client.put_log_events.call_count, 2)

def test_create_log_stream_if_needed_success(self):
"""Test log stream creation when needed."""
# This method should not raise an exception
Expand Down Expand Up @@ -671,16 +740,15 @@ def test_send_log_batch_retry_uses_suppress_instrumentation(self, mock_suppress)
# Verify suppress_instrumentation was called:
# 1. Initial _send_log_batch context
# 2. Nested context in the retry block
# 3. _create_log_group_if_needed context
# 4. _create_log_stream_if_needed context
self.assertEqual(mock_suppress.call_count, 4)
# 3. _create_log_stream_if_needed context (stream-first, succeeds so no group creation)
self.assertEqual(mock_suppress.call_count, 3)
# Each context should have been properly entered and exited
self.assertEqual(mock_context.__enter__.call_count, 4)
self.assertEqual(mock_context.__exit__.call_count, 4)
self.assertEqual(mock_context.__enter__.call_count, 3)
self.assertEqual(mock_context.__exit__.call_count, 3)

# Verify AWS calls happened
self.assertEqual(self.log_client.logs_client.put_log_events.call_count, 2)
self.log_client.logs_client.create_log_group.assert_called_once()
self.log_client.logs_client.create_log_group.assert_not_called()
self.log_client.logs_client.create_log_stream.assert_called_once()

@patch("amazon.opentelemetry.distro.exporter.aws.metrics._cloudwatch_log_client.suppress_instrumentation")
Expand Down