Skip to content

Commit 1dc6a65

Browse files
SNOW-2395236: renamed to use_requests_session to use_session and improved extensibility of sessionManager (#2568)
1 parent 54906d6 commit 1dc6a65

File tree

13 files changed

+108
-79
lines changed

13 files changed

+108
-79
lines changed

ci/pre-commit/check_no_native_http.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,7 @@ def _check_requests_call(
869869
node.lineno,
870870
node.col_offset,
871871
ViolationType.REQUESTS_SESSION,
872-
"Direct use of requests.Session() is forbidden, use SessionManager.use_requests_session() instead",
872+
"Direct use of requests.Session() is forbidden, use SessionManager.use_session() instead",
873873
)
874874
elif ModulePattern.is_http_method(func_name):
875875
return HTTPViolation(
@@ -1039,11 +1039,9 @@ def main():
10391039
print()
10401040
print("How to fix:")
10411041
print(" - Replace requests.request() with SessionManager.request()")
1042+
print(" - Replace requests.Session() with SessionManager.use_session()")
10421043
print(
1043-
" - Replace requests.Session() with SessionManager.use_requests_session()"
1044-
)
1045-
print(
1046-
" - Replace urllib3.PoolManager/ProxyManager() with session from session_manager.use_requests_session()"
1044+
" - Replace urllib3.PoolManager/ProxyManager() with session from session_manager.use_session()"
10471045
)
10481046
print(" - Replace direct HTTP method imports with SessionManager usage")
10491047
print(" - Use SessionManager for all HTTP operations")

src/snowflake/connector/auth/_auth.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@
5353
ReauthenticationRequest,
5454
)
5555
from ..platform_detection import detect_platforms
56-
from ..session_manager import SessionManager
56+
from ..session_manager import BaseHttpConfig, HttpConfig
57+
from ..session_manager import SessionManager as SyncSessionManager
5758
from ..sqlstate import SQLSTATE_CONNECTION_WAS_NOT_ESTABLISHED
5859
from ..token_cache import TokenCache, TokenKey, TokenType
5960
from ..version import VERSION
@@ -106,8 +107,17 @@ def base_auth_data(
106107
network_timeout: int | None = None,
107108
socket_timeout: int | None = None,
108109
platform_detection_timeout_seconds: float | None = None,
109-
session_manager: SessionManager | None = None,
110+
session_manager: SyncSessionManager | None = None,
111+
http_config: BaseHttpConfig | None = None,
110112
):
113+
# Create sync SessionManager for platform detection if config is provided
114+
# Platform detection runs in threads and uses sync SessionManager
115+
if http_config is not None and session_manager is None:
116+
# Extract base fields (automatically excludes subclass-specific fields)
117+
# Note: It won't be possible to pass adapter_factory from outer async-code to this part of code
118+
sync_config = HttpConfig(**http_config.to_base_dict())
119+
session_manager = SyncSessionManager(config=sync_config)
120+
111121
return {
112122
"data": {
113123
"CLIENT_APP_ID": internal_application_name,

src/snowflake/connector/network.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ def add_retry_params(self, full_url: str) -> str:
851851
include_retry_reason = self._connection._enable_retry_reason_in_query_response
852852
include_retry_params = kwargs.pop("_include_retry_params", False)
853853

854-
with self.use_requests_session(full_url) as session:
854+
with self.use_session(full_url) as session:
855855
retry_ctx = RetryCtx(
856856
_include_retry_params=include_retry_params,
857857
_include_retry_reason=include_retry_reason,
@@ -1213,5 +1213,5 @@ def _request_exec(
12131213
except Exception as err:
12141214
raise err
12151215

1216-
def use_requests_session(self, url=None) -> Generator[Session, Any, None]:
1217-
return self.session_manager.use_requests_session(url)
1216+
def use_session(self, url=None) -> Generator[Session, Any, None]:
1217+
return self.session_manager.use_session(url)

src/snowflake/connector/ocsp_snowflake.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ def _download_ocsp_response_cache(ocsp, url, do_retry: bool = True) -> bool:
552552
session_manager = get_current_session_manager(
553553
use_pooling=False
554554
) or SessionManager(use_pooling=False)
555-
with session_manager.use_requests_session() as session:
555+
with session_manager.use_session() as session:
556556
max_retry = SnowflakeOCSP.OCSP_CACHE_SERVER_MAX_RETRY if do_retry else 1
557557
sleep_time = 1
558558
backoff = exponential_backoff()()
@@ -1648,7 +1648,7 @@ def _fetch_ocsp_response(
16481648
if context_session_manager is not None
16491649
else SessionManager(use_pooling=False)
16501650
)
1651-
with session_manager.use_requests_session() as session:
1651+
with session_manager.use_session() as session:
16521652
max_retry = sf_max_retry if do_retry else 1
16531653
sleep_time = 1
16541654
backoff = exponential_backoff()()

src/snowflake/connector/platform_detection.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,9 @@ def detect_platforms(
412412

413413
if session_manager is None:
414414
# This should never happen - we expect session manager to be passed from the outer scope
415+
logger.debug(
416+
"No session manager provided. HTTP settings may not be preserved. Using default."
417+
)
415418
session_manager = SessionManager(use_pooling=False, max_retries=0)
416419

417420
# Run environment-only checks synchronously (no network calls, no threading overhead)

src/snowflake/connector/result_batch.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,14 +360,14 @@ def _download(
360360
and connection.rest.session_manager is not None
361361
):
362362
# If connection was explicitly passed and not closed yet - we can reuse SessionManager with session pooling
363-
with connection.rest.use_requests_session() as session:
363+
with connection.rest.use_session() as session:
364364
logger.debug(
365365
f"downloading result batch id: {self.id} with existing session {session}"
366366
)
367367
response = session.request("get", **request_data)
368368
elif self._session_manager is not None:
369369
# If connection is not accessible or was already closed, but cursors are now used to fetch the data - we will only reuse the http setup (through cloned SessionManager without session pooling)
370-
with self._session_manager.use_requests_session() as session:
370+
with self._session_manager.use_session() as session:
371371
response = session.request("get", **request_data)
372372
else:
373373
# If there was no session manager cloned, then we are using a default Session Manager setup, since it is very unlikely to enter this part outside of testing

src/snowflake/connector/session_manager.py

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
import functools
77
import itertools
88
import logging
9-
from dataclasses import dataclass, field, replace
10-
from typing import TYPE_CHECKING, Any, Callable, Generator, Mapping
9+
from dataclasses import asdict, dataclass, field, fields, replace
10+
from typing import TYPE_CHECKING, Any, Callable, Generator, Generic, Mapping, TypeVar
1111

1212
from .compat import urlparse
1313
from .proxy import get_proxy_url
@@ -26,6 +26,9 @@
2626
logger = logging.getLogger(__name__)
2727
REQUESTS_RETRY = 1 # requests library builtin retry
2828

29+
# Generic type for session objects (requests.Session, aiohttp.ClientSession, etc.) - no specific interface is required
30+
SessionT = TypeVar("SessionT")
31+
2932

3033
def _propagate_session_manager_to_ocsp(generator_func):
3134
"""Decorator: push self into ssl_wrap_socket ContextVar for OCSP duration.
@@ -122,23 +125,34 @@ def __call__(self, *args, **kwargs) -> ProxySupportAdapter:
122125

123126

124127
@dataclass(frozen=True)
125-
class HttpConfig:
128+
class BaseHttpConfig:
126129
"""Immutable HTTP configuration shared by SessionManager instances."""
127130

128-
adapter_factory: Callable[..., HTTPAdapter] = field(
129-
default_factory=ProxySupportAdapterFactory
130-
)
131131
use_pooling: bool = True
132132
max_retries: int | Retry | None = REQUESTS_RETRY
133133
proxy_host: str | None = None
134134
proxy_port: str | None = None
135135
proxy_user: str | None = None
136136
proxy_password: str | None = None
137137

138-
def copy_with(self, **overrides: Any) -> HttpConfig:
139-
"""Return a new HttpConfig with overrides applied."""
138+
def copy_with(self, **overrides: Any) -> BaseHttpConfig:
139+
"""Return a new config with overrides applied."""
140140
return replace(self, **overrides)
141141

142+
def to_base_dict(self) -> dict[str, Any]:
143+
"""Extract only BaseHttpConfig fields as a dict, excluding subclass-specific fields."""
144+
base_field_names = {f.name for f in fields(BaseHttpConfig)}
145+
return {k: v for k, v in asdict(self).items() if k in base_field_names}
146+
147+
148+
@dataclass(frozen=True)
149+
class HttpConfig(BaseHttpConfig):
150+
"""HTTP configuration specific to requests library."""
151+
152+
adapter_factory: Callable[..., HTTPAdapter] = field(
153+
default_factory=ProxySupportAdapterFactory
154+
)
155+
142156
def get_adapter(self, **override_adapter_factory_kwargs) -> HTTPAdapter:
143157
# We pass here only chosen attributes as kwargs to make the arguments received by the factory as compliant with the HttpAdapter constructor interface as possible.
144158
# We could consider passing the whole HttpConfig as kwarg to the factory if necessary in the future.
@@ -156,9 +170,9 @@ def get_adapter(self, **override_adapter_factory_kwargs) -> HTTPAdapter:
156170
return self.adapter_factory(**self_kwargs_for_adapter_factory)
157171

158172

159-
class SessionPool:
173+
class SessionPool(Generic[SessionT]):
160174
"""
161-
Component responsible for storing and reusing established instances of requests.Session class.
175+
Component responsible for storing and reusing established session instances.
162176
163177
This approach is especially useful in scenarios where multiple requests would have to be sent
164178
to the same host in short period of time. Instead of repeatedly establishing a new TCP connection
@@ -167,15 +181,17 @@ class SessionPool:
167181
168182
Sessions are created using the factory method make_session of a passed instance of the
169183
SessionManager class.
184+
185+
Generic over SessionT to support different session types (requests.Session, aiohttp.ClientSession, etc.)
170186
"""
171187

172188
def __init__(self, manager: SessionManager) -> None:
173189
# A stack of the idle sessions
174-
self._idle_sessions = []
175-
self._active_sessions = set()
190+
self._idle_sessions: list[SessionT] = []
191+
self._active_sessions: set[SessionT] = set()
176192
self._manager = manager
177193

178-
def get_session(self) -> Session:
194+
def get_session(self) -> SessionT:
179195
"""Returns a session from the session pool or creates a new one."""
180196
try:
181197
session = self._idle_sessions.pop()
@@ -184,7 +200,7 @@ def get_session(self) -> Session:
184200
self._active_sessions.add(session)
185201
return session
186202

187-
def return_session(self, session: Session) -> None:
203+
def return_session(self, session: SessionT) -> None:
188204
"""Places an active session back into the idle session stack."""
189205
try:
190206
self._active_sessions.remove(session)
@@ -249,11 +265,11 @@ class _RequestVerbsUsingSessionMixin(abc.ABC):
249265
"""
250266
Mixin that provides HTTP methods (get, post, put, etc.) mirroring requests.Session, maintaining their default argument behavior (e.g., HEAD uses allow_redirects=False).
251267
These wrappers manage the SessionManager's use of pooled/non-pooled sessions and delegate the actual request to the corresponding session.<verb>() method.
252-
The subclass must implement use_requests_session to yield a *requests.Session* instance.
268+
The subclass must implement use_session to yield a *requests.Session* instance.
253269
"""
254270

255271
@abc.abstractmethod
256-
def use_requests_session(self, url: str, use_pooling: bool) -> Session: ...
272+
def use_session(self, url: str, use_pooling: bool) -> Session: ...
257273

258274
def get(
259275
self,
@@ -264,7 +280,7 @@ def get(
264280
use_pooling: bool | None = None,
265281
**kwargs,
266282
):
267-
with self.use_requests_session(url, use_pooling) as session:
283+
with self.use_session(url, use_pooling) as session:
268284
return session.get(url, headers=headers, timeout=timeout, **kwargs)
269285

270286
def options(
@@ -276,7 +292,7 @@ def options(
276292
use_pooling: bool | None = None,
277293
**kwargs,
278294
):
279-
with self.use_requests_session(url, use_pooling) as session:
295+
with self.use_session(url, use_pooling) as session:
280296
return session.options(url, headers=headers, timeout=timeout, **kwargs)
281297

282298
def head(
@@ -288,7 +304,7 @@ def head(
288304
use_pooling: bool | None = None,
289305
**kwargs,
290306
):
291-
with self.use_requests_session(url, use_pooling) as session:
307+
with self.use_session(url, use_pooling) as session:
292308
return session.head(url, headers=headers, timeout=timeout, **kwargs)
293309

294310
def post(
@@ -302,7 +318,7 @@ def post(
302318
json=None,
303319
**kwargs,
304320
):
305-
with self.use_requests_session(url, use_pooling) as session:
321+
with self.use_session(url, use_pooling) as session:
306322
return session.post(
307323
url,
308324
headers=headers,
@@ -322,7 +338,7 @@ def put(
322338
data=None,
323339
**kwargs,
324340
):
325-
with self.use_requests_session(url, use_pooling) as session:
341+
with self.use_session(url, use_pooling) as session:
326342
return session.put(
327343
url, headers=headers, timeout=timeout, data=data, **kwargs
328344
)
@@ -337,7 +353,7 @@ def patch(
337353
data=None,
338354
**kwargs,
339355
):
340-
with self.use_requests_session(url, use_pooling) as session:
356+
with self.use_session(url, use_pooling) as session:
341357
return session.patch(
342358
url, headers=headers, timeout=timeout, data=data, **kwargs
343359
)
@@ -351,7 +367,7 @@ def delete(
351367
use_pooling: bool | None = None,
352368
**kwargs,
353369
):
354-
with self.use_requests_session(url, use_pooling) as session:
370+
with self.use_session(url, use_pooling) as session:
355371
return session.delete(url, headers=headers, timeout=timeout, **kwargs)
356372

357373

@@ -466,7 +482,7 @@ def make_session(self) -> Session:
466482

467483
@contextlib.contextmanager
468484
@_propagate_session_manager_to_ocsp
469-
def use_requests_session(
485+
def use_session(
470486
self, url: str | bytes | None = None, use_pooling: bool | None = None
471487
) -> Generator[Session, Any, None]:
472488
use_pooling = use_pooling if use_pooling is not None else self.use_pooling
@@ -500,7 +516,7 @@ def request(
500516
This wraps :pymeth:`use_session` so callers don’t have to manage the
501517
context manager themselves.
502518
"""
503-
with self.use_requests_session(url, use_pooling) as session:
519+
with self.use_session(url, use_pooling) as session:
504520
return session.request(
505521
method=method.upper(),
506522
url=url,

src/snowflake/connector/storage_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ def _send_request_with_retry(
289289
rest_kwargs["timeout"] = (REQUEST_CONNECTION_TIMEOUT, REQUEST_READ_TIMEOUT)
290290
try:
291291
if conn:
292-
with conn.rest.use_requests_session(url=url) as session:
292+
with conn.rest.use_session(url=url) as session:
293293
logger.debug(f"storage client request with session {session}")
294294
response = session.request(verb, url, **rest_kwargs)
295295
else:

src/snowflake/connector/telemetry_oob.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ def _upload_payload(self, payload) -> None:
482482
# This logger guarantees the payload won't be masked. Testing purpose.
483483
rt_plain_logger.debug(f"OOB telemetry data being sent is {payload}")
484484

485-
# TODO(SNOW-2259522): Telemetry OOB is currently disabled. If Telemetry OOB is to be re-enabled, this HTTP call must be routed through the connection_argument.session_manager.use_requests_session(use_pooling) (so the SessionManager instance attached to the connection which initialization's fail most likely triggered this telemetry log). It would allow to pick up proxy configuration & custom headers (see tickets SNOW-694457 and SNOW-2203079).
485+
# TODO(SNOW-2259522): Telemetry OOB is currently disabled. If Telemetry OOB is to be re-enabled, this HTTP call must be routed through the connection_argument.session_manager.use_session(use_pooling) (so the SessionManager instance attached to the connection which initialization's fail most likely triggered this telemetry log). It would allow to pick up proxy configuration & custom headers (see tickets SNOW-694457 and SNOW-2203079).
486486
with requests.Session() as session:
487487
headers = {
488488
"Content-type": "application/json",

test/integ/pandas_it/test_arrow_pandas.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,8 +1429,8 @@ def test_sessions_used(conn_cnx, fetch_fn_name, pass_connection):
14291429

14301430
# check that sessions are used when connection is supplied
14311431
with mock.patch(
1432-
"snowflake.connector.network.SnowflakeRestful.use_requests_session",
1433-
side_effect=cnx._rest.use_requests_session,
1432+
"snowflake.connector.network.SnowflakeRestful.use_session",
1433+
side_effect=cnx._rest.use_session,
14341434
) as get_session_mock:
14351435
fetch_fn(connection=connection)
14361436
assert get_session_mock.call_count == (1 if pass_connection else 0)

0 commit comments

Comments
 (0)