Skip to content

Commit 315f8dc

Browse files
committed
Merge branch 'srothh/transport-class-hierarchy' into srothh/async-task-identifier
2 parents df1ceaf + 9a2c80c commit 315f8dc

File tree

5 files changed

+87
-84
lines changed

5 files changed

+87
-84
lines changed

sentry_sdk/api.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,14 @@ def flush(
226226
return get_client().flush(timeout=timeout, callback=callback)
227227

228228

229+
@clientmethod
230+
async def flush_async(
231+
timeout: Optional[float] = None,
232+
callback: Optional[Callable[[int, float], None]] = None,
233+
) -> None:
234+
return await get_client().flush_async(timeout=timeout, callback=callback)
235+
236+
229237
def start_span(**kwargs: Any) -> Span:
230238
"""
231239
Start and return a span.

sentry_sdk/client.py

Lines changed: 68 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
from sentry_sdk.integrations.dedupe import DedupeIntegration
3939
from sentry_sdk.sessions import SessionFlusher
4040
from sentry_sdk.envelope import Envelope
41-
from sentry_sdk.utils import mark_sentry_task_internal
4241

4342
from sentry_sdk.profiler.continuous_profiler import setup_continuous_profiler
4443
from sentry_sdk.profiler.transaction_profiler import (
@@ -216,6 +215,12 @@ def close(self, *args: Any, **kwargs: Any) -> None:
216215
def flush(self, *args: Any, **kwargs: Any) -> None:
217216
return None
218217

218+
async def close_async(self, *args: Any, **kwargs: Any) -> None:
219+
return None
220+
221+
async def flush_async(self, *args: Any, **kwargs: Any) -> None:
222+
return None
223+
219224
def __enter__(self) -> BaseClient:
220225
return self
221226

@@ -919,14 +924,6 @@ def get_integration(
919924

920925
return self.integrations.get(integration_name)
921926

922-
def _close_transport(self) -> Optional[asyncio.Task[None]]:
923-
"""Close transport and return cleanup task if any."""
924-
if self.transport is not None:
925-
cleanup_task = self.transport.kill() # type: ignore
926-
self.transport = None
927-
return cleanup_task
928-
return None
929-
930927
def _close_components(self) -> None:
931928
"""Kill all client components in the correct order."""
932929
self.session_flusher.kill()
@@ -935,14 +932,7 @@ def _close_components(self) -> None:
935932
if self.monitor:
936933
self.monitor.kill()
937934

938-
async def _close_components_async(self) -> None:
939-
"""Async version of _close_components that properly awaits transport cleanup."""
940-
self._close_components()
941-
cleanup_task = self._close_transport()
942-
if cleanup_task is not None:
943-
await cleanup_task
944-
945-
def close( # type: ignore[override]
935+
def close(
946936
self,
947937
timeout: Optional[float] = None,
948938
callback: Optional[Callable[[int, float], None]] = None,
@@ -951,40 +941,45 @@ def close( # type: ignore[override]
951941
Close the client and shut down the transport. Arguments have the same
952942
semantics as :py:meth:`Client.flush`. When using the async transport, close needs to be awaited to block.
953943
"""
954-
955-
async def _flush_and_close(
956-
timeout: Optional[float], callback: Optional[Callable[[int, float], None]]
957-
) -> None:
958-
959-
await self._flush_async(timeout=timeout, callback=callback)
960-
await self._close_components_async()
961-
962944
if self.transport is not None:
963945
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
964946
self.transport, "loop"
965947
):
948+
logger.debug(
949+
"close() used with AsyncHttpTransport, aborting. Please use close_async() instead."
950+
)
951+
return
952+
self.flush(timeout=timeout, callback=callback)
953+
self._close_components()
954+
self.transport.kill()
955+
self.transport = None
966956

967-
try:
968-
with mark_sentry_task_internal():
969-
flush_task = self.transport.loop.create_task(
970-
_flush_and_close(timeout, callback)
971-
)
972-
except RuntimeError:
973-
# Shutdown the components anyway
974-
self._close_components()
975-
self._close_transport()
976-
logger.warning("Event loop not running, aborting close.")
977-
return None
978-
# Enforce flush before shutdown
979-
return flush_task
980-
else:
981-
self.flush(timeout=timeout, callback=callback)
982-
self._close_components()
983-
self._close_transport()
984-
985-
return None
957+
async def close_async(
958+
self,
959+
timeout: Optional[float] = None,
960+
callback: Optional[Callable[[int, float], None]] = None,
961+
) -> None:
962+
"""
963+
Asynchronously close the client and shut down the transport. Arguments have the same
964+
semantics as :py:meth:`Client.flush_async`.
965+
"""
966+
if self.transport is not None:
967+
if not (
968+
isinstance(self.transport, AsyncHttpTransport)
969+
and hasattr(self.transport, "loop")
970+
):
971+
logger.debug(
972+
"close_async() used with non-async transport, aborting. Please use close() instead."
973+
)
974+
return
975+
await self.flush_async(timeout=timeout, callback=callback)
976+
self._close_components()
977+
kill_task = self.transport.kill() # type: ignore
978+
if kill_task is not None:
979+
await kill_task
980+
self.transport = None
986981

987-
def flush( # type: ignore[override]
982+
def flush(
988983
self,
989984
timeout: Optional[float] = None,
990985
callback: Optional[Callable[[int, float], None]] = None,
@@ -1000,38 +995,40 @@ def flush( # type: ignore[override]
1000995
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
1001996
self.transport, "loop"
1002997
):
1003-
try:
1004-
with mark_sentry_task_internal():
1005-
return self.transport.loop.create_task(
1006-
self._flush_async(timeout, callback)
1007-
)
1008-
except RuntimeError:
1009-
logger.warning("Event loop not running, aborting flush.")
1010-
return None
1011-
else:
1012-
self._flush_sync(timeout, callback)
1013-
return None
1014-
1015-
def _flush_sync(
1016-
self, timeout: Optional[float], callback: Optional[Callable[[int, float], None]]
1017-
) -> None:
1018-
"""Synchronous flush implementation."""
1019-
if timeout is None:
1020-
timeout = self.options["shutdown_timeout"]
998+
logger.debug(
999+
"flush() used with AsyncHttpTransport, aborting. Please use flush_async() instead."
1000+
)
1001+
return
1002+
if timeout is None:
1003+
timeout = self.options["shutdown_timeout"]
1004+
self._flush_components()
10211005

1022-
self._flush_components()
1023-
if self.transport is not None:
10241006
self.transport.flush(timeout=timeout, callback=callback)
10251007

1026-
async def _flush_async(
1027-
self, timeout: Optional[float], callback: Optional[Callable[[int, float], None]]
1008+
async def flush_async(
1009+
self,
1010+
timeout: Optional[float] = None,
1011+
callback: Optional[Callable[[int, float], None]] = None,
10281012
) -> None:
1029-
"""Asynchronous flush implementation."""
1030-
if timeout is None:
1031-
timeout = self.options["shutdown_timeout"]
1013+
"""
1014+
Asynchronously wait for the current events to be sent.
10321015
1033-
self._flush_components()
1016+
:param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used.
1017+
1018+
:param callback: Is invoked with the number of pending events and the configured timeout.
1019+
"""
10341020
if self.transport is not None:
1021+
if not (
1022+
isinstance(self.transport, AsyncHttpTransport)
1023+
and hasattr(self.transport, "loop")
1024+
):
1025+
logger.debug(
1026+
"flush_async() used with non-async transport, aborting. Please use flush() instead."
1027+
)
1028+
return
1029+
if timeout is None:
1030+
timeout = self.options["shutdown_timeout"]
1031+
self._flush_components()
10351032
flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore
10361033
if flush_task is not None:
10371034
await flush_task

sentry_sdk/integrations/asyncio.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,7 @@ async def _flush() -> None:
5858
if not isinstance(client.transport, AsyncHttpTransport):
5959
return
6060

61-
task = client.close() # type: ignore
62-
if task is not None:
63-
await task
61+
await client.close_async()
6462
except Exception:
6563
logger.warning("Sentry flush failed during loop shutdown", exc_info=True)
6664

tests/test_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1732,7 +1732,7 @@ async def test_async_proxy(monkeypatch, testcase):
17321732
)
17331733
assert proxy_headers == testcase["arg_proxy_headers"]
17341734

1735-
await client.close()
1735+
await client.close_async()
17361736

17371737

17381738
@pytest.mark.parametrize(
@@ -1820,4 +1820,4 @@ async def test_async_socks_proxy(testcase):
18201820
f"but got {str(type(client.transport._pool))}"
18211821
)
18221822

1823-
await client.close()
1823+
await client.close_async()

tests/test_transport.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,9 @@ async def test_transport_works_async(
204204
capture_message("löl")
205205

206206
if client_flush_method == "close":
207-
await client.close(timeout=2.0)
207+
await client.close_async(timeout=2.0)
208208
if client_flush_method == "flush":
209-
await client.flush(timeout=2.0)
209+
await client.flush_async(timeout=2.0)
210210

211211
out, err = capsys.readouterr()
212212
assert not err and not out
@@ -230,7 +230,7 @@ async def test_transport_works_async(
230230
# Therefore, we need to explicitly close the client to clean up the worker task
231231
assert any("Sending envelope" in record.msg for record in caplog.records) == debug
232232
if client_flush_method == "flush":
233-
await client.close(timeout=2.0)
233+
await client.close_async(timeout=2.0)
234234

235235

236236
@pytest.mark.parametrize(
@@ -833,7 +833,7 @@ def background_thread_work():
833833
thread.join()
834834
assert not exception_from_thread
835835
assert captured_from_thread
836-
await client.close(timeout=2.0)
836+
await client.close_async(timeout=2.0)
837837
assert capturing_server.captured
838838

839839

@@ -860,7 +860,7 @@ async def test_async_transport_event_loop_closed_scenario(
860860
)
861861

862862
client.transport.loop = original_loop
863-
await client.close(timeout=2.0)
863+
await client.close_async(timeout=2.0)
864864

865865

866866
@pytest.mark.asyncio
@@ -882,7 +882,7 @@ async def send_message(i):
882882

883883
tasks = [send_message(i) for i in range(num_messages)]
884884
await asyncio.gather(*tasks)
885-
await client.close(timeout=2.0)
885+
await client.close_async(timeout=2.0)
886886
assert len(capturing_server.captured) == num_messages
887887

888888

@@ -916,7 +916,7 @@ async def send_message(i):
916916
await asyncio.sleep(0.1)
917917
# New request should be dropped due to rate limiting
918918
assert len(capturing_server.captured) == 0
919-
await client.close(timeout=2.0)
919+
await client.close_async(timeout=2.0)
920920

921921

922922
@pytest.mark.asyncio
@@ -938,4 +938,4 @@ async def test_async_two_way_ssl_authentication():
938938
options = client.transport._get_pool_options()
939939
assert options["ssl_context"] is not None
940940

941-
await client.close()
941+
await client.close_async()

0 commit comments

Comments
 (0)