Skip to content

Commit b1eff8b

Browse files
committed
Merge branch 'srothh/async-transport-integration' into srothh/async-task-identifier
2 parents 7ae7f12 + f44f690 commit b1eff8b

File tree

3 files changed

+135
-114
lines changed

3 files changed

+135
-114
lines changed

sentry_sdk/transport.py

Lines changed: 118 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -590,9 +590,115 @@ def flush(
590590
self._worker.flush(timeout, callback)
591591

592592

593+
class HttpTransport(BaseHttpTransport):
594+
if TYPE_CHECKING:
595+
_pool: Union[PoolManager, ProxyManager]
596+
597+
def _get_pool_options(self: Self) -> Dict[str, Any]:
598+
599+
num_pools = self.options.get("_experiments", {}).get("transport_num_pools")
600+
options = {
601+
"num_pools": 2 if num_pools is None else int(num_pools),
602+
"cert_reqs": "CERT_REQUIRED",
603+
"timeout": urllib3.Timeout(total=self.TIMEOUT),
604+
}
605+
606+
socket_options: Optional[List[Tuple[int, int, int | bytes]]] = None
607+
608+
if self.options["socket_options"] is not None:
609+
socket_options = self.options["socket_options"]
610+
611+
if self.options["keep_alive"]:
612+
if socket_options is None:
613+
socket_options = []
614+
615+
used_options = {(o[0], o[1]) for o in socket_options}
616+
for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
617+
if (default_option[0], default_option[1]) not in used_options:
618+
socket_options.append(default_option)
619+
620+
if socket_options is not None:
621+
options["socket_options"] = socket_options
622+
623+
options["ca_certs"] = (
624+
self.options["ca_certs"] # User-provided bundle from the SDK init
625+
or os.environ.get("SSL_CERT_FILE")
626+
or os.environ.get("REQUESTS_CA_BUNDLE")
627+
or certifi.where()
628+
)
629+
630+
options["cert_file"] = self.options["cert_file"] or os.environ.get(
631+
"CLIENT_CERT_FILE"
632+
)
633+
options["key_file"] = self.options["key_file"] or os.environ.get(
634+
"CLIENT_KEY_FILE"
635+
)
636+
637+
return options
638+
639+
def _make_pool(self: Self) -> Union[PoolManager, ProxyManager]:
640+
if self.parsed_dsn is None:
641+
raise ValueError("Cannot create HTTP-based transport without valid DSN")
642+
643+
proxy = None
644+
no_proxy = self._in_no_proxy(self.parsed_dsn)
645+
646+
# try HTTPS first
647+
https_proxy = self.options["https_proxy"]
648+
if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
649+
proxy = https_proxy or (not no_proxy and getproxies().get("https"))
650+
651+
# maybe fallback to HTTP proxy
652+
http_proxy = self.options["http_proxy"]
653+
if not proxy and (http_proxy != ""):
654+
proxy = http_proxy or (not no_proxy and getproxies().get("http"))
655+
656+
opts = self._get_pool_options()
657+
658+
if proxy:
659+
proxy_headers = self.options["proxy_headers"]
660+
if proxy_headers:
661+
opts["proxy_headers"] = proxy_headers
662+
663+
if proxy.startswith("socks"):
664+
use_socks_proxy = True
665+
try:
666+
# Check if PySocks dependency is available
667+
from urllib3.contrib.socks import SOCKSProxyManager
668+
except ImportError:
669+
use_socks_proxy = False
670+
logger.warning(
671+
"You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support. Please add `PySocks` (or `urllib3` with the `[socks]` extra) to your dependencies.",
672+
proxy,
673+
)
674+
675+
if use_socks_proxy:
676+
return SOCKSProxyManager(proxy, **opts)
677+
else:
678+
return urllib3.PoolManager(**opts)
679+
else:
680+
return urllib3.ProxyManager(proxy, **opts)
681+
else:
682+
return urllib3.PoolManager(**opts)
683+
684+
def _request(
685+
self: Self,
686+
method: str,
687+
endpoint_type: EndpointType,
688+
body: Any,
689+
headers: Mapping[str, str],
690+
) -> urllib3.BaseHTTPResponse:
691+
return self._pool.request(
692+
method,
693+
self._auth.get_api_url(endpoint_type),
694+
body=body,
695+
headers=headers,
696+
)
697+
698+
593699
if not ASYNC_TRANSPORT_ENABLED:
594700
# Sorry, no AsyncHttpTransport for you
595-
AsyncHttpTransport = BaseHttpTransport
701+
AsyncHttpTransport = HttpTransport
596702

597703
else:
598704

@@ -807,112 +913,6 @@ def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore
807913
return None
808914

809915

810-
class HttpTransport(BaseHttpTransport):
811-
if TYPE_CHECKING:
812-
_pool: Union[PoolManager, ProxyManager]
813-
814-
def _get_pool_options(self: Self) -> Dict[str, Any]:
815-
816-
num_pools = self.options.get("_experiments", {}).get("transport_num_pools")
817-
options = {
818-
"num_pools": 2 if num_pools is None else int(num_pools),
819-
"cert_reqs": "CERT_REQUIRED",
820-
"timeout": urllib3.Timeout(total=self.TIMEOUT),
821-
}
822-
823-
socket_options: Optional[List[Tuple[int, int, int | bytes]]] = None
824-
825-
if self.options["socket_options"] is not None:
826-
socket_options = self.options["socket_options"]
827-
828-
if self.options["keep_alive"]:
829-
if socket_options is None:
830-
socket_options = []
831-
832-
used_options = {(o[0], o[1]) for o in socket_options}
833-
for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
834-
if (default_option[0], default_option[1]) not in used_options:
835-
socket_options.append(default_option)
836-
837-
if socket_options is not None:
838-
options["socket_options"] = socket_options
839-
840-
options["ca_certs"] = (
841-
self.options["ca_certs"] # User-provided bundle from the SDK init
842-
or os.environ.get("SSL_CERT_FILE")
843-
or os.environ.get("REQUESTS_CA_BUNDLE")
844-
or certifi.where()
845-
)
846-
847-
options["cert_file"] = self.options["cert_file"] or os.environ.get(
848-
"CLIENT_CERT_FILE"
849-
)
850-
options["key_file"] = self.options["key_file"] or os.environ.get(
851-
"CLIENT_KEY_FILE"
852-
)
853-
854-
return options
855-
856-
def _make_pool(self: Self) -> Union[PoolManager, ProxyManager]:
857-
if self.parsed_dsn is None:
858-
raise ValueError("Cannot create HTTP-based transport without valid DSN")
859-
860-
proxy = None
861-
no_proxy = self._in_no_proxy(self.parsed_dsn)
862-
863-
# try HTTPS first
864-
https_proxy = self.options["https_proxy"]
865-
if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
866-
proxy = https_proxy or (not no_proxy and getproxies().get("https"))
867-
868-
# maybe fallback to HTTP proxy
869-
http_proxy = self.options["http_proxy"]
870-
if not proxy and (http_proxy != ""):
871-
proxy = http_proxy or (not no_proxy and getproxies().get("http"))
872-
873-
opts = self._get_pool_options()
874-
875-
if proxy:
876-
proxy_headers = self.options["proxy_headers"]
877-
if proxy_headers:
878-
opts["proxy_headers"] = proxy_headers
879-
880-
if proxy.startswith("socks"):
881-
use_socks_proxy = True
882-
try:
883-
# Check if PySocks dependency is available
884-
from urllib3.contrib.socks import SOCKSProxyManager
885-
except ImportError:
886-
use_socks_proxy = False
887-
logger.warning(
888-
"You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support. Please add `PySocks` (or `urllib3` with the `[socks]` extra) to your dependencies.",
889-
proxy,
890-
)
891-
892-
if use_socks_proxy:
893-
return SOCKSProxyManager(proxy, **opts)
894-
else:
895-
return urllib3.PoolManager(**opts)
896-
else:
897-
return urllib3.ProxyManager(proxy, **opts)
898-
else:
899-
return urllib3.PoolManager(**opts)
900-
901-
def _request(
902-
self: Self,
903-
method: str,
904-
endpoint_type: EndpointType,
905-
body: Any,
906-
headers: Mapping[str, str],
907-
) -> urllib3.BaseHTTPResponse:
908-
return self._pool.request(
909-
method,
910-
self._auth.get_api_url(endpoint_type),
911-
body=body,
912-
headers=headers,
913-
)
914-
915-
916916
if not HTTP2_ENABLED:
917917
# Sorry, no Http2Transport for you
918918
class Http2Transport(HttpTransport):
@@ -1053,14 +1053,24 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]:
10531053

10541054
use_http2_transport = options.get("_experiments", {}).get("transport_http2", False)
10551055
use_async_transport = options.get("_experiments", {}).get("transport_async", False)
1056+
async_integration = any(
1057+
integration.__class__.__name__ == "AsyncioIntegration"
1058+
for integration in options.get("integrations") or []
1059+
)
1060+
10561061
# By default, we use the http transport class
10571062
transport_cls: Type[Transport] = (
10581063
Http2Transport if use_http2_transport else HttpTransport
10591064
)
10601065
if use_async_transport and ASYNC_TRANSPORT_ENABLED:
10611066
try:
10621067
asyncio.get_running_loop()
1063-
transport_cls = AsyncHttpTransport
1068+
if async_integration:
1069+
transport_cls = AsyncHttpTransport
1070+
else:
1071+
logger.warning(
1072+
"You tried to use AsyncHttpTransport but the AsyncioIntegration is not enabled. Falling back to sync transport."
1073+
)
10641074
except RuntimeError:
10651075
# No event loop running, fall back to sync transport
10661076
logger.warning("No event loop running, falling back to sync transport.")

tests/test_client.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
from sentry_sdk.spotlight import DEFAULT_SPOTLIGHT_URL
2424
from sentry_sdk.utils import capture_internal_exception
2525
from sentry_sdk.integrations.executing import ExecutingIntegration
26+
from sentry_sdk.integrations.asyncio import AsyncioIntegration
27+
2628
from sentry_sdk.transport import Transport, AsyncHttpTransport
2729
from sentry_sdk.serializer import MAX_DATABAG_BREADTH
2830
from sentry_sdk.consts import DEFAULT_MAX_BREADCRUMBS, DEFAULT_MAX_VALUE_LENGTH
@@ -1693,7 +1695,10 @@ async def test_async_proxy(monkeypatch, testcase):
16931695
if testcase.get("env_no_proxy") is not None:
16941696
monkeypatch.setenv("NO_PROXY", testcase["env_no_proxy"])
16951697

1696-
kwargs = {"_experiments": {"transport_async": True}}
1698+
kwargs = {
1699+
"_experiments": {"transport_async": True},
1700+
"integrations": [AsyncioIntegration()],
1701+
}
16971702

16981703
if testcase["arg_http_proxy"] is not None:
16991704
kwargs["http_proxy"] = testcase["arg_http_proxy"]
@@ -1795,7 +1800,10 @@ async def test_async_socks_proxy(testcase):
17951800
# These are just the same tests as the sync ones, but they need to be run in an event loop
17961801
# and respect the shutdown behavior of the async transport
17971802

1798-
kwargs = {"_experiments": {"transport_async": True}}
1803+
kwargs = {
1804+
"_experiments": {"transport_async": True},
1805+
"integrations": [AsyncioIntegration()],
1806+
}
17991807

18001808
if testcase["arg_http_proxy"] is not None:
18011809
kwargs["http_proxy"] = testcase["arg_http_proxy"]

tests/test_transport.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
AsyncHttpTransport,
3434
)
3535
from sentry_sdk.integrations.logging import LoggingIntegration, ignore_logger
36+
from sentry_sdk.integrations.asyncio import AsyncioIntegration
3637

3738

3839
server = None
@@ -183,6 +184,7 @@ async def test_transport_works_async(
183184
client = make_client(
184185
debug=debug,
185186
_experiments=experiments,
187+
integrations=[AsyncioIntegration()],
186188
)
187189

188190
if use_pickle:
@@ -812,7 +814,7 @@ async def test_async_transport_background_thread_capture(
812814
"""Test capture_envelope from background threads uses run_coroutine_threadsafe"""
813815
caplog.set_level(logging.DEBUG)
814816
experiments = {"transport_async": True}
815-
client = make_client(_experiments=experiments)
817+
client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()])
816818
assert isinstance(client.transport, AsyncHttpTransport)
817819
sentry_sdk.get_global_scope().set_client(client)
818820
captured_from_thread = []
@@ -843,7 +845,7 @@ async def test_async_transport_event_loop_closed_scenario(
843845
"""Test behavior when trying to capture after event loop context ends"""
844846
caplog.set_level(logging.DEBUG)
845847
experiments = {"transport_async": True}
846-
client = make_client(_experiments=experiments)
848+
client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()])
847849
sentry_sdk.get_global_scope().set_client(client)
848850
original_loop = client.transport.loop
849851

@@ -869,7 +871,7 @@ async def test_async_transport_concurrent_requests(
869871
"""Test multiple simultaneous envelope submissions"""
870872
caplog.set_level(logging.DEBUG)
871873
experiments = {"transport_async": True}
872-
client = make_client(_experiments=experiments)
874+
client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()])
873875
assert isinstance(client.transport, AsyncHttpTransport)
874876
sentry_sdk.get_global_scope().set_client(client)
875877

@@ -891,7 +893,7 @@ async def test_async_transport_rate_limiting_with_concurrency(
891893
):
892894
"""Test async transport rate limiting with concurrent requests"""
893895
experiments = {"transport_async": True}
894-
client = make_client(_experiments=experiments)
896+
client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()])
895897

896898
assert isinstance(client.transport, AsyncHttpTransport)
897899
sentry_sdk.get_global_scope().set_client(client)
@@ -929,6 +931,7 @@ async def test_async_two_way_ssl_authentication():
929931
cert_file=cert_file,
930932
key_file=key_file,
931933
_experiments={"transport_async": True},
934+
integrations=[AsyncioIntegration()],
932935
)
933936
assert isinstance(client.transport, AsyncHttpTransport)
934937

0 commit comments

Comments
 (0)