3
3
import uuid
4
4
import random
5
5
import socket
6
+ import asyncio
6
7
from collections .abc import Mapping
7
8
from datetime import datetime , timezone
8
9
from importlib import import_module
25
26
)
26
27
from sentry_sdk .serializer import serialize
27
28
from sentry_sdk .tracing import trace
28
- from sentry_sdk .transport import HttpTransportCore , make_transport
29
+ from sentry_sdk .transport import HttpTransportCore , make_transport , AsyncHttpTransport
29
30
from sentry_sdk .consts import (
30
31
SPANDATA ,
31
32
DEFAULT_MAX_VALUE_LENGTH ,
@@ -914,36 +915,50 @@ def get_integration(
914
915
915
916
return self .integrations .get (integration_name )
916
917
918
+ def _close_components (self ) -> None :
919
+ """Kill all client components in the correct order."""
920
+ self .session_flusher .kill ()
921
+ if self .log_batcher is not None :
922
+ self .log_batcher .kill ()
923
+ if self .monitor :
924
+ self .monitor .kill ()
925
+ if self .transport is not None :
926
+ self .transport .kill ()
927
+ self .transport = None
928
+
917
929
def close (
918
930
self ,
919
931
timeout : Optional [float ] = None ,
920
932
callback : Optional [Callable [[int , float ], None ]] = None ,
921
- ) -> None :
933
+ ) -> Optional [ asyncio . Task [ None ]] :
922
934
"""
923
935
Close the client and shut down the transport. Arguments have the same
924
- semantics as :py:meth:`Client.flush`.
936
+ semantics as :py:meth:`Client.flush`. When using the async transport, close needs to be awaited to block.
925
937
"""
926
938
if self .transport is not None :
927
- self .flush ( timeout = timeout , callback = callback )
939
+ if isinstance ( self .transport , AsyncHttpTransport ):
928
940
929
- self .session_flusher .kill ()
941
+ def _on_flush_done (_ : asyncio .Task [None ]) -> None :
942
+ self ._close_components ()
930
943
931
- if self .log_batcher is not None :
932
- self .log_batcher .kill ()
933
-
934
- if self .monitor :
935
- self .monitor .kill ()
936
-
937
- self .transport .kill ()
938
- self .transport = None
944
+ flush_task = self .transport .loop .create_task (
945
+ self ._flush_async (timeout , callback )
946
+ )
947
+ # Enforce flush before shutdown
948
+ flush_task .add_done_callback (_on_flush_done )
949
+ return flush_task
950
+ else :
951
+ self .flush (timeout = timeout , callback = callback )
952
+ self ._close_components ()
953
+ return None
939
954
940
955
def flush (
941
956
self ,
942
957
timeout : Optional [float ] = None ,
943
958
callback : Optional [Callable [[int , float ], None ]] = None ,
944
- ) -> None :
959
+ ) -> Optional [ asyncio . Task [ None ]] :
945
960
"""
946
- Wait for the current events to be sent.
961
+ Wait for the current events to be sent. When using the async transport, flush needs to be awaited to block.
947
962
948
963
:param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used.
949
964
@@ -952,12 +967,28 @@ def flush(
952
967
if self .transport is not None :
953
968
if timeout is None :
954
969
timeout = self .options ["shutdown_timeout" ]
970
+
955
971
self .session_flusher .flush ()
956
972
957
973
if self .log_batcher is not None :
958
974
self .log_batcher .flush ()
959
975
960
- self .transport .flush (timeout = timeout , callback = callback )
976
+ if isinstance (self .transport , AsyncHttpTransport ):
977
+ return self .transport .loop .create_task (
978
+ self ._flush_async (timeout , callback )
979
+ )
980
+ else :
981
+ self .transport .flush (timeout = timeout , callback = callback )
982
+
983
+ return None
984
+
985
+ async def _flush_async (
986
+ self , timeout : float , callback : Optional [Callable [[int , float ], None ]]
987
+ ) -> None :
988
+ self .session_flusher .flush ()
989
+ if self .log_batcher is not None :
990
+ self .log_batcher .flush ()
991
+ await self .transport .flush_async (timeout = timeout , callback = callback ) # type: ignore
961
992
962
993
def __enter__ (self ) -> _Client :
963
994
return self
0 commit comments