15
15
from typing import TYPE_CHECKING , Any
16
16
17
17
import OpenSSL .SSL
18
+ from urllib3 .util .url import parse_url
18
19
19
20
from ..compat import (
20
21
FORBIDDEN ,
80
81
SQLSTATE_CONNECTION_REJECTED ,
81
82
SQLSTATE_CONNECTION_WAS_NOT_ESTABLISHED ,
82
83
)
83
- from ..time_util import TimeoutBackoffCtx , get_time_millis
84
+ from ..time_util import TimeoutBackoffCtx
84
85
from ._ssl_connector import SnowflakeSSLConnector
85
86
86
87
if TYPE_CHECKING :
@@ -162,6 +163,10 @@ def __init__(
162
163
self ._ocsp_mode = (
163
164
self ._connection ._ocsp_mode () if self ._connection else OCSPMode .FAIL_OPEN
164
165
)
166
+ if self ._connection .proxy_host :
167
+ self ._get_proxy_headers = lambda url : {"Host" : parse_url (url ).hostname }
168
+ else :
169
+ self ._get_proxy_headers = lambda _ : None
165
170
166
171
async def close (self ) -> None :
167
172
if hasattr (self , "_token" ):
@@ -704,11 +709,6 @@ async def _request_exec(
704
709
else :
705
710
input_data = data
706
711
707
- download_start_time = get_time_millis ()
708
- # socket timeout is constant. You should be able to receive
709
- # the response within the time. If not, ConnectReadTimeout or
710
- # ReadTimeout is raised.
711
-
712
712
# TODO: aiohttp auth parameter works differently than requests.session.request
713
713
# we can check if there's other aiohttp built-in mechanism to update this
714
714
if HEADER_AUTHORIZATION_KEY in headers :
@@ -718,26 +718,31 @@ async def _request_exec(
718
718
token = token
719
719
)
720
720
721
- # TODO: sync feature parity, parameters verify/stream in sync version
721
+ # socket timeout is constant. You should be able to receive
722
+ # the response within the time. If not, asyncio.TimeoutError is raised.
723
+
724
+ # delta compared to sync:
725
+ # - in sync, we specify "verify" to True; in aiohttp,
726
+ # the counter parameter is "ssl" and it already defaults to True
722
727
raw_ret = await session .request (
723
728
method = method ,
724
729
url = full_url ,
725
730
headers = headers ,
726
731
data = input_data ,
727
732
timeout = aiohttp .ClientTimeout (socket_timeout ),
733
+ proxy_headers = self ._get_proxy_headers (full_url ),
728
734
)
729
-
730
- download_end_time = get_time_millis ()
731
-
732
735
try :
733
736
if raw_ret .status == OK :
734
737
logger .debug ("SUCCESS" )
735
738
if is_raw_text :
736
739
ret = await raw_ret .text ()
737
740
elif is_raw_binary :
738
- content = await raw_ret .read ()
739
- ret = binary_data_handler .to_iterator (
740
- content , download_end_time - download_start_time
741
+ # check SNOW-1738595 for is_raw_binary support
742
+ raise NotImplementedError (
743
+ "reading raw binary data is not supported in asyncio connector,"
744
+ " please open a feature request issue in"
745
+ " github: https://github.com/snowflakedb/snowflake-connector-python/issues/new/choose"
741
746
)
742
747
else :
743
748
ret = await raw_ret .json ()
@@ -818,12 +823,9 @@ async def _request_exec(
818
823
819
824
def make_requests_session (self ) -> aiohttp .ClientSession :
820
825
s = aiohttp .ClientSession (
821
- connector = SnowflakeSSLConnector (snowflake_ocsp_mode = self ._ocsp_mode )
826
+ connector = SnowflakeSSLConnector (snowflake_ocsp_mode = self ._ocsp_mode ),
827
+ trust_env = True , # this is for proxy support, proxy.set_proxy will set envs and trust_env allows reading env
822
828
)
823
- # TODO: sync feature parity, proxy support
824
- # s.mount("http://", ProxySupportAdapter(max_retries=REQUESTS_RETRY))
825
- # s.mount("https://", ProxySupportAdapter(max_retries=REQUESTS_RETRY))
826
- # s._reuse_count = itertools.count()
827
829
return s
828
830
829
831
@contextlib .asynccontextmanager
0 commit comments