Skip to content

Commit 1db746a

Browse files
committed
ref(async transport): Separate flush/close and async flush/close
GH-4601
1 parent f44f690 commit 1db746a

File tree

5 files changed

+89
-77
lines changed

5 files changed

+89
-77
lines changed

sentry_sdk/api.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,14 @@ def flush(
226226
return get_client().flush(timeout=timeout, callback=callback)
227227

228228

229+
@clientmethod
230+
def flush_async(
231+
timeout: Optional[float] = None,
232+
callback: Optional[Callable[[int, float], None]] = None,
233+
) -> None:
234+
return get_client().flush_async(timeout=timeout, callback=callback)
235+
236+
229237
def start_span(**kwargs: Any) -> Span:
230238
"""
231239
Start and return a span.

sentry_sdk/client.py

Lines changed: 70 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,12 @@ def close(self, *args: Any, **kwargs: Any) -> None:
215215
def flush(self, *args: Any, **kwargs: Any) -> None:
216216
return None
217217

218+
async def close_async(self, *args: Any, **kwargs: Any) -> None:
219+
return None
220+
221+
async def flush_async(self, *args: Any, **kwargs: Any) -> None:
222+
return None
223+
218224
def __enter__(self) -> BaseClient:
219225
return self
220226

@@ -934,61 +940,58 @@ def _close_components(self) -> None:
934940
if self.monitor:
935941
self.monitor.kill()
936942

937-
async def _close_components_async(self) -> None:
938-
"""Async version of _close_components that properly awaits transport cleanup."""
939-
self._close_components()
940-
cleanup_task = self._close_transport()
941-
if cleanup_task is not None:
942-
await cleanup_task
943-
944-
def close( # type: ignore[override]
943+
def close(
945944
self,
946945
timeout: Optional[float] = None,
947946
callback: Optional[Callable[[int, float], None]] = None,
948-
) -> Optional[asyncio.Task[None]]:
947+
) -> None:
949948
"""
950949
Close the client and shut down the transport. Arguments have the same
951-
semantics as :py:meth:`Client.flush`. When using the async transport, close needs to be awaited to block.
950+
semantics as :py:meth:`Client.flush`.
952951
"""
953-
954-
async def _flush_and_close(
955-
timeout: Optional[float], callback: Optional[Callable[[int, float], None]]
956-
) -> None:
957-
958-
await self._flush_async(timeout=timeout, callback=callback)
959-
await self._close_components_async()
960-
961952
if self.transport is not None:
962953
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
963954
self.transport, "loop"
964955
):
956+
logger.debug(
957+
"close() used with AsyncHttpTransport, aborting. Please use close_async() instead."
958+
)
959+
return
960+
self.flush(timeout=timeout, callback=callback)
961+
self._close_components()
962+
self.transport.kill()
963+
self.transport = None
965964

966-
try:
967-
flush_task = self.transport.loop.create_task(
968-
_flush_and_close(timeout, callback)
969-
)
970-
except RuntimeError:
971-
# Shutdown the components anyway
972-
self._close_components()
973-
self._close_transport()
974-
logger.warning("Event loop not running, aborting close.")
975-
return None
976-
# Enforce flush before shutdown
977-
return flush_task
978-
else:
979-
self.flush(timeout=timeout, callback=callback)
980-
self._close_components()
981-
self._close_transport()
982-
983-
return None
965+
async def close_async(
966+
self,
967+
timeout: Optional[float] = None,
968+
callback: Optional[Callable[[int, float], None]] = None,
969+
) -> None:
970+
"""
971+
Asynchronously close the client and shut down the transport. Arguments have the same
972+
semantics as :py:meth:`Client.flush_async`.
973+
"""
974+
if self.transport is not None:
975+
if not (
976+
isinstance(self.transport, AsyncHttpTransport)
977+
and hasattr(self.transport, "loop")
978+
):
979+
logger.debug(
980+
"close_async() used with non-async transport, aborting. Please use close() instead."
981+
)
982+
return
983+
await self.flush_async(timeout=timeout, callback=callback)
984+
self._close_components()
985+
await self.transport.kill()
986+
self.transport = None
984987

985-
def flush( # type: ignore[override]
988+
def flush(
986989
self,
987990
timeout: Optional[float] = None,
988991
callback: Optional[Callable[[int, float], None]] = None,
989-
) -> Optional[asyncio.Task[None]]:
992+
) -> None:
990993
"""
991-
Wait for the current events to be sent. When using the async transport, flush needs to be awaited to block.
994+
Wait for the current events to be sent.
992995
993996
:param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used.
994997
@@ -998,37 +1001,40 @@ def flush( # type: ignore[override]
9981001
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
9991002
self.transport, "loop"
10001003
):
1001-
try:
1002-
return self.transport.loop.create_task(
1003-
self._flush_async(timeout, callback)
1004-
)
1005-
except RuntimeError:
1006-
logger.warning("Event loop not running, aborting flush.")
1007-
return None
1008-
else:
1009-
self._flush_sync(timeout, callback)
1010-
return None
1011-
1012-
def _flush_sync(
1013-
self, timeout: Optional[float], callback: Optional[Callable[[int, float], None]]
1014-
) -> None:
1015-
"""Synchronous flush implementation."""
1016-
if timeout is None:
1017-
timeout = self.options["shutdown_timeout"]
1004+
logger.debug(
1005+
"flush() used with AsyncHttpTransport, aborting. Please use flush_async() instead."
1006+
)
1007+
return
1008+
if timeout is None:
1009+
timeout = self.options["shutdown_timeout"]
1010+
self._flush_components()
10181011

1019-
self._flush_components()
1020-
if self.transport is not None:
10211012
self.transport.flush(timeout=timeout, callback=callback)
10221013

1023-
async def _flush_async(
1024-
self, timeout: Optional[float], callback: Optional[Callable[[int, float], None]]
1014+
async def flush_async(
1015+
self,
1016+
timeout: Optional[float] = None,
1017+
callback: Optional[Callable[[int, float], None]] = None,
10251018
) -> None:
1026-
"""Asynchronous flush implementation."""
1027-
if timeout is None:
1028-
timeout = self.options["shutdown_timeout"]
1019+
"""
1020+
Asynchronously wait for the current events to be sent.
10291021
1030-
self._flush_components()
1022+
:param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used.
1023+
1024+
:param callback: Is invoked with the number of pending events and the configured timeout.
1025+
"""
10311026
if self.transport is not None:
1027+
if not (
1028+
isinstance(self.transport, AsyncHttpTransport)
1029+
and hasattr(self.transport, "loop")
1030+
):
1031+
logger.debug(
1032+
"flush_async() used with non-async transport, aborting. Please use flush() instead."
1033+
)
1034+
return
1035+
if timeout is None:
1036+
timeout = self.options["shutdown_timeout"]
1037+
self._flush_components()
10321038
flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore
10331039
if flush_task is not None:
10341040
await flush_task

sentry_sdk/integrations/asyncio.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,7 @@ async def _flush() -> None:
5353
if not isinstance(client.transport, AsyncHttpTransport):
5454
return
5555

56-
task = client.close() # type: ignore
57-
if task is not None:
58-
await task
56+
await client.close_async()
5957
except Exception:
6058
logger.warning("Sentry flush failed during loop shutdown", exc_info=True)
6159

tests/test_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1732,7 +1732,7 @@ async def test_async_proxy(monkeypatch, testcase):
17321732
)
17331733
assert proxy_headers == testcase["arg_proxy_headers"]
17341734

1735-
await client.close()
1735+
await client.close_async()
17361736

17371737

17381738
@pytest.mark.parametrize(
@@ -1820,4 +1820,4 @@ async def test_async_socks_proxy(testcase):
18201820
f"but got {str(type(client.transport._pool))}"
18211821
)
18221822

1823-
await client.close()
1823+
await client.close_async()

tests/test_transport.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,9 @@ async def test_transport_works_async(
204204
capture_message("löl")
205205

206206
if client_flush_method == "close":
207-
await client.close(timeout=2.0)
207+
await client.close_async(timeout=2.0)
208208
if client_flush_method == "flush":
209-
await client.flush(timeout=2.0)
209+
await client.flush_async(timeout=2.0)
210210

211211
out, err = capsys.readouterr()
212212
assert not err and not out
@@ -230,7 +230,7 @@ async def test_transport_works_async(
230230
# Therefore, we need to explicitly close the client to clean up the worker task
231231
assert any("Sending envelope" in record.msg for record in caplog.records) == debug
232232
if client_flush_method == "flush":
233-
await client.close(timeout=2.0)
233+
await client.close_async(timeout=2.0)
234234

235235

236236
@pytest.mark.parametrize(
@@ -833,7 +833,7 @@ def background_thread_work():
833833
thread.join()
834834
assert not exception_from_thread
835835
assert captured_from_thread
836-
await client.close(timeout=2.0)
836+
await client.close_async(timeout=2.0)
837837
assert capturing_server.captured
838838

839839

@@ -860,7 +860,7 @@ async def test_async_transport_event_loop_closed_scenario(
860860
)
861861

862862
client.transport.loop = original_loop
863-
await client.close(timeout=2.0)
863+
await client.close_async(timeout=2.0)
864864

865865

866866
@pytest.mark.asyncio
@@ -882,7 +882,7 @@ async def send_message(i):
882882

883883
tasks = [send_message(i) for i in range(num_messages)]
884884
await asyncio.gather(*tasks)
885-
await client.close(timeout=2.0)
885+
await client.close_async(timeout=2.0)
886886
assert len(capturing_server.captured) == num_messages
887887

888888

@@ -916,7 +916,7 @@ async def send_message(i):
916916
await asyncio.sleep(0.1)
917917
# New request should be dropped due to rate limiting
918918
assert len(capturing_server.captured) == 0
919-
await client.close(timeout=2.0)
919+
await client.close_async(timeout=2.0)
920920

921921

922922
@pytest.mark.asyncio
@@ -938,4 +938,4 @@ async def test_async_two_way_ssl_authentication():
938938
options = client.transport._get_pool_options()
939939
assert options["ssl_context"] is not None
940940

941-
await client.close()
941+
await client.close_async()

0 commit comments

Comments
 (0)