Skip to content

Commit dc98cf8

Browse files
committed
add interruptible shutdown
1 parent 8b7e671 commit dc98cf8

File tree

2 files changed

+46
-26
lines changed

2 files changed

+46
-26
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import logging
77
import random
88
from io import BytesIO
9-
from time import sleep, time
9+
from threading import Event
10+
from time import time
1011
from typing import Dict, Optional, Sequence
1112

1213
from requests import Response
@@ -60,6 +61,7 @@ def __init__(
6061
compression=Compression.Gzip,
6162
session=AwsAuthSession(aws_region=self._aws_region, service="logs"),
6263
)
64+
self._shutdown_event = Event()
6365

6466
def export(self, batch: Sequence[LogData]) -> LogExportResult:
6567
"""
@@ -114,9 +116,18 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
114116
resp.reason,
115117
backoff_seconds,
116118
)
117-
sleep(backoff_seconds)
119+
# Use interruptible sleep that can be interrupted by shutdown
120+
if self._shutdown_event.wait(backoff_seconds):
121+
_logger.info("Export interrupted by shutdown")
122+
return LogExportResult.FAILURE
123+
118124
retry_num += 1
119125

126+
def shutdown(self) -> None:
127+
"""Shutdown the exporter and interrupt any ongoing waits."""
128+
self._shutdown_event.set()
129+
return super().shutdown()
130+
120131
def _send(self, serialized_data: bytes, timeout_sec: float):
121132
try:
122133
response = self._session.post(

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_otlp_aws_logs_exporter.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -73,19 +73,17 @@ def test_should_not_export_again_if_not_retryable(self, mock_request):
7373

7474
self.assertEqual(result, LogExportResult.FAILURE)
7575

76-
@patch(
77-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None
78-
)
76+
@patch("threading.Event.wait", side_effect=lambda x: False)
7977
@patch("requests.Session.post", return_value=retryable_response_no_header)
80-
def test_should_export_again_with_backoff_if_retryable_and_no_retry_after_header(self, mock_request, mock_sleep):
78+
def test_should_export_again_with_backoff_if_retryable_and_no_retry_after_header(self, mock_request, mock_wait):
8179
"""Tests that multiple export requests are made with exponential delay if the response status code is retryable.
8280
But there is no Retry-After header."""
8381
self.exporter._timeout = 10000 # Large timeout to avoid early exit
8482
result = self.exporter.export(self.logs)
8583

86-
self.assertEqual(mock_sleep.call_count, _MAX_RETRYS - 1)
84+
self.assertEqual(mock_wait.call_count, _MAX_RETRYS - 1)
8785

88-
delays = mock_sleep.call_args_list
86+
delays = mock_wait.call_args_list
8987

9088
for index, delay in enumerate(delays):
9189
expected_base = 2**index
@@ -97,30 +95,26 @@ def test_should_export_again_with_backoff_if_retryable_and_no_retry_after_header
9795
self.assertEqual(mock_request.call_count, _MAX_RETRYS)
9896
self.assertEqual(result, LogExportResult.FAILURE)
9997

100-
@patch(
101-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None
102-
)
98+
@patch("threading.Event.wait", side_effect=lambda x: False)
10399
@patch(
104100
"requests.Session.post",
105101
side_effect=[retryable_response_header, retryable_response_header, retryable_response_header, good_response],
106102
)
107-
def test_should_export_again_with_server_delay_if_retryable_and_retry_after_header(self, mock_request, mock_sleep):
103+
def test_should_export_again_with_server_delay_if_retryable_and_retry_after_header(self, mock_request, mock_wait):
108104
"""Tests that multiple export requests are made with the server's suggested
109105
delay if the response status code is retryable and there is a Retry-After header."""
110106
self.exporter._timeout = 10000 # Large timeout to avoid early exit
111107
result = self.exporter.export(self.logs)
112-
delays = mock_sleep.call_args_list
108+
delays = mock_wait.call_args_list
113109

114110
for delay in delays:
115111
self.assertEqual(delay[0][0], 10)
116112

117-
self.assertEqual(mock_sleep.call_count, 3)
113+
self.assertEqual(mock_wait.call_count, 3)
118114
self.assertEqual(mock_request.call_count, 4)
119115
self.assertEqual(result, LogExportResult.SUCCESS)
120116

121-
@patch(
122-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None
123-
)
117+
@patch("threading.Event.wait", side_effect=lambda x: False)
124118
@patch(
125119
"requests.Session.post",
126120
side_effect=[
@@ -131,13 +125,13 @@ def test_should_export_again_with_server_delay_if_retryable_and_retry_after_head
131125
],
132126
)
133127
def test_should_export_again_with_backoff_delay_if_retryable_and_bad_retry_after_header(
134-
self, mock_request, mock_sleep
128+
self, mock_request, mock_wait
135129
):
136130
"""Tests that multiple export requests are made with exponential delay if the response status code is retryable.
137131
but the Retry-After header is invalid or malformed."""
138132
self.exporter._timeout = 10000 # Large timeout to avoid early exit
139133
result = self.exporter.export(self.logs)
140-
delays = mock_sleep.call_args_list
134+
delays = mock_wait.call_args_list
141135

142136
for index, delay in enumerate(delays):
143137
expected_base = 2**index
@@ -146,7 +140,7 @@ def test_should_export_again_with_backoff_delay_if_retryable_and_bad_retry_after
146140
self.assertGreaterEqual(actual_delay, expected_base * 0.8)
147141
self.assertLessEqual(actual_delay, expected_base * 1.2)
148142

149-
self.assertEqual(mock_sleep.call_count, 3)
143+
self.assertEqual(mock_wait.call_count, 3)
150144
self.assertEqual(mock_request.call_count, 4)
151145
self.assertEqual(result, LogExportResult.SUCCESS)
152146

@@ -158,29 +152,44 @@ def test_export_connection_error_retry(self, mock_request):
158152
self.assertEqual(mock_request.call_count, 2)
159153
self.assertEqual(result, LogExportResult.SUCCESS)
160154

161-
@patch(
162-
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.sleep", side_effect=lambda x: None
163-
)
155+
@patch("threading.Event.wait", side_effect=lambda x: False)
164156
@patch("requests.Session.post", return_value=retryable_response_no_header)
165-
def test_should_stop_retrying_when_deadline_exceeded(self, mock_request, mock_sleep):
157+
def test_should_stop_retrying_when_deadline_exceeded(self, mock_request, mock_wait):
166158
"""Tests that the exporter stops retrying when the deadline is exceeded."""
167159
self.exporter._timeout = 5 # Short timeout to trigger deadline check
168160

169-
# Mock time to simulate time passing
170161
with patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.time") as mock_time:
171162
# First call returns start time, subsequent calls simulate time passing
172163
mock_time.side_effect = [0, 0, 1, 2, 4, 8] # Exponential backoff would be 1, 2, 4 seconds
173164

174165
result = self.exporter.export(self.logs)
175166

176167
# Should stop before max retries due to deadline
177-
self.assertLess(mock_sleep.call_count, _MAX_RETRYS)
168+
self.assertLess(mock_wait.call_count, _MAX_RETRYS)
178169
self.assertLess(mock_request.call_count, _MAX_RETRYS + 1)
179170
self.assertEqual(result, LogExportResult.FAILURE)
180171

181172
# Verify total time passed is at the timeout limit
182173
self.assertGreaterEqual(5, self.exporter._timeout)
183174

175+
@patch("requests.Session.post", return_value=retryable_response_no_header)
176+
def test_export_interrupted_by_shutdown(self, mock_request):
177+
"""Tests that export can be interrupted by shutdown during retry wait."""
178+
self.exporter._timeout = 10000
179+
180+
# Mock Event.wait to call shutdown on first call, then return True (interrupted)
181+
# We cannot call shutdown() at the beginning since the exporter would just automatically return a FAILURE result without even attempting the export.
182+
def mock_wait_with_shutdown(timeout):
183+
self.exporter.shutdown()
184+
return True
185+
186+
with patch.object(self.exporter._shutdown_event, 'wait', side_effect=mock_wait_with_shutdown):
187+
result = self.exporter.export(self.logs)
188+
189+
# Should make one request, then get interrupted during retry wait
190+
self.assertEqual(mock_request.call_count, 1)
191+
self.assertEqual(result, LogExportResult.FAILURE)
192+
184193
@staticmethod
185194
def generate_test_log_data(count=5):
186195
logs = []

0 commit comments

Comments
 (0)