Skip to content

Commit aa3a025

Browse files
SNOW-2395236: renamed to use_requests_session to use_session - to improve extensibility of code. Made SessionPool generic.
1 parent d6113ba commit aa3a025

File tree

11 files changed

+58
-49
lines changed

11 files changed

+58
-49
lines changed

ci/pre-commit/check_no_native_http.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,7 @@ def _check_requests_call(
869869
node.lineno,
870870
node.col_offset,
871871
ViolationType.REQUESTS_SESSION,
872-
"Direct use of requests.Session() is forbidden, use SessionManager.use_requests_session() instead",
872+
"Direct use of requests.Session() is forbidden, use SessionManager.use_session() instead",
873873
)
874874
elif ModulePattern.is_http_method(func_name):
875875
return HTTPViolation(
@@ -1039,11 +1039,9 @@ def main():
10391039
print()
10401040
print("How to fix:")
10411041
print(" - Replace requests.request() with SessionManager.request()")
1042+
print(" - Replace requests.Session() with SessionManager.use_session()")
10421043
print(
1043-
" - Replace requests.Session() with SessionManager.use_requests_session()"
1044-
)
1045-
print(
1046-
" - Replace urllib3.PoolManager/ProxyManager() with session from session_manager.use_requests_session()"
1044+
" - Replace urllib3.PoolManager/ProxyManager() with session from session_manager.use_session()"
10471045
)
10481046
print(" - Replace direct HTTP method imports with SessionManager usage")
10491047
print(" - Use SessionManager for all HTTP operations")

src/snowflake/connector/network.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ def add_retry_params(self, full_url: str) -> str:
851851
include_retry_reason = self._connection._enable_retry_reason_in_query_response
852852
include_retry_params = kwargs.pop("_include_retry_params", False)
853853

854-
with self.use_requests_session(full_url) as session:
854+
with self.use_session(full_url) as session:
855855
retry_ctx = RetryCtx(
856856
_include_retry_params=include_retry_params,
857857
_include_retry_reason=include_retry_reason,
@@ -1213,5 +1213,5 @@ def _request_exec(
12131213
except Exception as err:
12141214
raise err
12151215

1216-
def use_requests_session(self, url=None) -> Generator[Session, Any, None]:
1217-
return self.session_manager.use_requests_session(url)
1216+
def use_session(self, url=None) -> Generator[Session, Any, None]:
1217+
return self.session_manager.use_session(url)

src/snowflake/connector/ocsp_snowflake.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ def _download_ocsp_response_cache(ocsp, url, do_retry: bool = True) -> bool:
552552
session_manager = get_current_session_manager(
553553
use_pooling=False
554554
) or SessionManager(use_pooling=False)
555-
with session_manager.use_requests_session() as session:
555+
with session_manager.use_session() as session:
556556
max_retry = SnowflakeOCSP.OCSP_CACHE_SERVER_MAX_RETRY if do_retry else 1
557557
sleep_time = 1
558558
backoff = exponential_backoff()()
@@ -1648,7 +1648,7 @@ def _fetch_ocsp_response(
16481648
if context_session_manager is not None
16491649
else SessionManager(use_pooling=False)
16501650
)
1651-
with session_manager.use_requests_session() as session:
1651+
with session_manager.use_session() as session:
16521652
max_retry = sf_max_retry if do_retry else 1
16531653
sleep_time = 1
16541654
backoff = exponential_backoff()()

src/snowflake/connector/result_batch.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,14 +360,14 @@ def _download(
360360
and connection.rest.session_manager is not None
361361
):
362362
# If connection was explicitly passed and not closed yet - we can reuse SessionManager with session pooling
363-
with connection.rest.use_requests_session() as session:
363+
with connection.rest.use_session() as session:
364364
logger.debug(
365365
f"downloading result batch id: {self.id} with existing session {session}"
366366
)
367367
response = session.request("get", **request_data)
368368
elif self._session_manager is not None:
369369
# If connection is not accessible or was already closed, but cursors are now used to fetch the data - we will only reuse the http setup (through cloned SessionManager without session pooling)
370-
with self._session_manager.use_requests_session() as session:
370+
with self._session_manager.use_session() as session:
371371
response = session.request("get", **request_data)
372372
else:
373373
# If there was no session manager cloned, then we are using a default Session Manager setup, since it is very unlikely to enter this part outside of testing

src/snowflake/connector/session_manager.py

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import itertools
88
import logging
99
from dataclasses import dataclass, field, replace
10-
from typing import TYPE_CHECKING, Any, Callable, Generator, Mapping
10+
from typing import TYPE_CHECKING, Any, Callable, Generator, Generic, Mapping, TypeVar
1111

1212
from .compat import urlparse
1313
from .proxy import get_proxy_url
@@ -26,6 +26,9 @@
2626
logger = logging.getLogger(__name__)
2727
REQUESTS_RETRY = 1 # requests library builtin retry
2828

29+
# Generic type for session objects (requests.Session, aiohttp.ClientSession, etc.) - no specific interface is required
30+
SessionT = TypeVar("SessionT")
31+
2932

3033
def _propagate_session_manager_to_ocsp(generator_func):
3134
"""Decorator: push self into ssl_wrap_socket ContextVar for OCSP duration.
@@ -122,23 +125,29 @@ def __call__(self, *args, **kwargs) -> ProxySupportAdapter:
122125

123126

124127
@dataclass(frozen=True)
125-
class HttpConfig:
128+
class BaseHttpConfig:
126129
"""Immutable HTTP configuration shared by SessionManager instances."""
127130

128-
adapter_factory: Callable[..., HTTPAdapter] = field(
129-
default_factory=ProxySupportAdapterFactory
130-
)
131131
use_pooling: bool = True
132132
max_retries: int | Retry | None = REQUESTS_RETRY
133133
proxy_host: str | None = None
134134
proxy_port: str | None = None
135135
proxy_user: str | None = None
136136
proxy_password: str | None = None
137137

138-
def copy_with(self, **overrides: Any) -> HttpConfig:
139-
"""Return a new HttpConfig with overrides applied."""
138+
def copy_with(self, **overrides: Any) -> BaseHttpConfig:
139+
"""Return a new config with overrides applied."""
140140
return replace(self, **overrides)
141141

142+
143+
@dataclass(frozen=True)
144+
class HttpConfig(BaseHttpConfig):
145+
"""HTTP configuration specific to requests library."""
146+
147+
adapter_factory: Callable[..., HTTPAdapter] = field(
148+
default_factory=ProxySupportAdapterFactory
149+
)
150+
142151
def get_adapter(self, **override_adapter_factory_kwargs) -> HTTPAdapter:
143152
# We pass here only chosen attributes as kwargs to make the arguments received by the factory as compliant with the HttpAdapter constructor interface as possible.
144153
# We could consider passing the whole HttpConfig as kwarg to the factory if necessary in the future.
@@ -156,9 +165,9 @@ def get_adapter(self, **override_adapter_factory_kwargs) -> HTTPAdapter:
156165
return self.adapter_factory(**self_kwargs_for_adapter_factory)
157166

158167

159-
class SessionPool:
168+
class SessionPool(Generic[SessionT]):
160169
"""
161-
Component responsible for storing and reusing established instances of requests.Session class.
170+
Component responsible for storing and reusing established session instances.
162171
163172
This approach is especially useful in scenarios where multiple requests would have to be sent
164173
to the same host in short period of time. Instead of repeatedly establishing a new TCP connection
@@ -167,15 +176,17 @@ class SessionPool:
167176
168177
Sessions are created using the factory method make_session of a passed instance of the
169178
SessionManager class.
179+
180+
Generic over SessionT to support different session types (requests.Session, aiohttp.ClientSession, etc.)
170181
"""
171182

172183
def __init__(self, manager: SessionManager) -> None:
173184
# A stack of the idle sessions
174-
self._idle_sessions = []
175-
self._active_sessions = set()
185+
self._idle_sessions: list[SessionT] = []
186+
self._active_sessions: set[SessionT] = set()
176187
self._manager = manager
177188

178-
def get_session(self) -> Session:
189+
def get_session(self) -> SessionT:
179190
"""Returns a session from the session pool or creates a new one."""
180191
try:
181192
session = self._idle_sessions.pop()
@@ -184,7 +195,7 @@ def get_session(self) -> Session:
184195
self._active_sessions.add(session)
185196
return session
186197

187-
def return_session(self, session: Session) -> None:
198+
def return_session(self, session: SessionT) -> None:
188199
"""Places an active session back into the idle session stack."""
189200
try:
190201
self._active_sessions.remove(session)
@@ -249,11 +260,11 @@ class _RequestVerbsUsingSessionMixin(abc.ABC):
249260
"""
250261
Mixin that provides HTTP methods (get, post, put, etc.) mirroring requests.Session, maintaining their default argument behavior (e.g., HEAD uses allow_redirects=False).
251262
These wrappers manage the SessionManager's use of pooled/non-pooled sessions and delegate the actual request to the corresponding session.<verb>() method.
252-
The subclass must implement use_requests_session to yield a *requests.Session* instance.
263+
The subclass must implement use_session to yield a *requests.Session* instance.
253264
"""
254265

255266
@abc.abstractmethod
256-
def use_requests_session(self, url: str, use_pooling: bool) -> Session: ...
267+
def use_session(self, url: str, use_pooling: bool) -> Session: ...
257268

258269
def get(
259270
self,
@@ -264,7 +275,7 @@ def get(
264275
use_pooling: bool | None = None,
265276
**kwargs,
266277
):
267-
with self.use_requests_session(url, use_pooling) as session:
278+
with self.use_session(url, use_pooling) as session:
268279
return session.get(url, headers=headers, timeout=timeout, **kwargs)
269280

270281
def options(
@@ -276,7 +287,7 @@ def options(
276287
use_pooling: bool | None = None,
277288
**kwargs,
278289
):
279-
with self.use_requests_session(url, use_pooling) as session:
290+
with self.use_session(url, use_pooling) as session:
280291
return session.options(url, headers=headers, timeout=timeout, **kwargs)
281292

282293
def head(
@@ -288,7 +299,7 @@ def head(
288299
use_pooling: bool | None = None,
289300
**kwargs,
290301
):
291-
with self.use_requests_session(url, use_pooling) as session:
302+
with self.use_session(url, use_pooling) as session:
292303
return session.head(url, headers=headers, timeout=timeout, **kwargs)
293304

294305
def post(
@@ -302,7 +313,7 @@ def post(
302313
json=None,
303314
**kwargs,
304315
):
305-
with self.use_requests_session(url, use_pooling) as session:
316+
with self.use_session(url, use_pooling) as session:
306317
return session.post(
307318
url,
308319
headers=headers,
@@ -322,7 +333,7 @@ def put(
322333
data=None,
323334
**kwargs,
324335
):
325-
with self.use_requests_session(url, use_pooling) as session:
336+
with self.use_session(url, use_pooling) as session:
326337
return session.put(
327338
url, headers=headers, timeout=timeout, data=data, **kwargs
328339
)
@@ -337,7 +348,7 @@ def patch(
337348
data=None,
338349
**kwargs,
339350
):
340-
with self.use_requests_session(url, use_pooling) as session:
351+
with self.use_session(url, use_pooling) as session:
341352
return session.patch(
342353
url, headers=headers, timeout=timeout, data=data, **kwargs
343354
)
@@ -351,7 +362,7 @@ def delete(
351362
use_pooling: bool | None = None,
352363
**kwargs,
353364
):
354-
with self.use_requests_session(url, use_pooling) as session:
365+
with self.use_session(url, use_pooling) as session:
355366
return session.delete(url, headers=headers, timeout=timeout, **kwargs)
356367

357368

@@ -466,7 +477,7 @@ def make_session(self) -> Session:
466477

467478
@contextlib.contextmanager
468479
@_propagate_session_manager_to_ocsp
469-
def use_requests_session(
480+
def use_session(
470481
self, url: str | bytes | None = None, use_pooling: bool | None = None
471482
) -> Generator[Session, Any, None]:
472483
use_pooling = use_pooling if use_pooling is not None else self.use_pooling
@@ -500,7 +511,7 @@ def request(
500511
This wraps :pymeth:`use_session` so callers don’t have to manage the
501512
context manager themselves.
502513
"""
503-
with self.use_requests_session(url, use_pooling) as session:
514+
with self.use_session(url, use_pooling) as session:
504515
return session.request(
505516
method=method.upper(),
506517
url=url,

src/snowflake/connector/storage_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ def _send_request_with_retry(
289289
rest_kwargs["timeout"] = (REQUEST_CONNECTION_TIMEOUT, REQUEST_READ_TIMEOUT)
290290
try:
291291
if conn:
292-
with conn.rest.use_requests_session(url=url) as session:
292+
with conn.rest.use_session(url=url) as session:
293293
logger.debug(f"storage client request with session {session}")
294294
response = session.request(verb, url, **rest_kwargs)
295295
else:

src/snowflake/connector/telemetry_oob.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ def _upload_payload(self, payload) -> None:
482482
# This logger guarantees the payload won't be masked. Testing purpose.
483483
rt_plain_logger.debug(f"OOB telemetry data being sent is {payload}")
484484

485-
# TODO(SNOW-2259522): Telemetry OOB is currently disabled. If Telemetry OOB is to be re-enabled, this HTTP call must be routed through the connection_argument.session_manager.use_requests_session(use_pooling) (so the SessionManager instance attached to the connection which initialization's fail most likely triggered this telemetry log). It would allow to pick up proxy configuration & custom headers (see tickets SNOW-694457 and SNOW-2203079).
485+
# TODO(SNOW-2259522): Telemetry OOB is currently disabled. If Telemetry OOB is to be re-enabled, this HTTP call must be routed through the connection_argument.session_manager.use_session(use_pooling) (so the SessionManager instance attached to the connection which initialization's fail most likely triggered this telemetry log). It would allow to pick up proxy configuration & custom headers (see tickets SNOW-694457 and SNOW-2203079).
486486
with requests.Session() as session:
487487
headers = {
488488
"Content-type": "application/json",

test/integ/pandas_it/test_arrow_pandas.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,8 +1429,8 @@ def test_sessions_used(conn_cnx, fetch_fn_name, pass_connection):
14291429

14301430
# check that sessions are used when connection is supplied
14311431
with mock.patch(
1432-
"snowflake.connector.network.SnowflakeRestful.use_requests_session",
1433-
side_effect=cnx._rest.use_requests_session,
1432+
"snowflake.connector.network.SnowflakeRestful.use_session",
1433+
side_effect=cnx._rest.use_session,
14341434
) as get_session_mock:
14351435
fetch_fn(connection=connection)
14361436
assert get_session_mock.call_count == (1 if pass_connection else 0)

test/integ/test_connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ def test_ocsp_and_rest_pool_isolation(conn_cnx, disable_request_pooling):
10771077

10781078
assert rest_sm_1.sessions_map or disable_request_pooling
10791079

1080-
with rest_sm_1.use_requests_session("https://example.com"):
1080+
with rest_sm_1.use_session("https://example.com"):
10811081
ocsp_sm_1 = get_current_session_manager(create_default_if_missing=False)
10821082
assert ocsp_sm_1 is not rest_sm_1
10831083
assert ocsp_sm_1.config == rest_sm_1.config
@@ -1096,7 +1096,7 @@ def test_ocsp_and_rest_pool_isolation(conn_cnx, disable_request_pooling):
10961096
assert rest_sm_2.sessions_map or disable_request_pooling
10971097
assert rest_sm_2 is not rest_sm_1
10981098

1099-
with rest_sm_2.use_requests_session("https://example.com"):
1099+
with rest_sm_2.use_session("https://example.com"):
11001100
ocsp_sm_2 = get_current_session_manager(create_default_if_missing=False)
11011101
assert ocsp_sm_2 is not rest_sm_2
11021102
assert ocsp_sm_2.config == rest_sm_2.config

test/integ/test_cursor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,8 +1802,8 @@ def test_fetch_batches_with_sessions(conn_cnx):
18021802
num_batches = len(cur.get_result_batches())
18031803

18041804
with mock.patch(
1805-
"snowflake.connector.session_manager.SessionManager.use_requests_session",
1806-
side_effect=con._rest.session_manager.use_requests_session,
1805+
"snowflake.connector.session_manager.SessionManager.use_session",
1806+
side_effect=con._rest.session_manager.use_session,
18071807
) as get_session_mock:
18081808
result = cur.fetchall()
18091809
# all but one batch is downloaded using a session

0 commit comments

Comments
 (0)