Skip to content

Commit 1d323ea

Browse files
mayfieldsmtakeda
authored andcommitted
Session refactor (#6)
* Thread safety refactor of http session management. Make session management explicitly managed by callers to `access_url`. This enables each caller to protect the session from threading issues in a way that best matches their particular model. For Snowflakerestful this is managed as a pool of sessions; For chunk_downloader I will implement something similar that provides for good reuse and cleanup of each session while achieving as much connection pooling as can be done safely. * Session consolidation Fairly large refactor in the spirit of consolidating HTTP call-patterns and configuration. Changed `SnowflakeRestful.access_url` from a staticmethod to a regular instance method, reborn as a `fetch`. The name change helps distinguish the type of activity being performed, but is otherwise cosmetic. Requests session pooling is manged in one place by the SnowflakeRestful instance regardless of the type of HTTP activity. This provides the best session reuse possible with consistent thread safety characteristics. Note that the `requests.Session` objects are created outside their final execution thread, but they are never used by more than one request thread at a time. Other changes: - Moved network.py logger instance to module level. - DRY optimizations for _get_request, _post_request, etc. Namely consolidation of proxy and timeout settings. - Default `fetch`'s keyword arg `token` to `network.NOTOKEN`. I believe this is semantically acceptable (or desirable) but should be reviewed. - Renamed `chunk_downloader`'s `_get_request` to `_fetch_chunk` to more aptly associate it with `SnowflakeRestful.fetch()`. - Renamed `request_thread` function to `request_exec` as it is not always a thread target.
1 parent 795a132 commit 1d323ea

File tree

4 files changed

+163
-205
lines changed

4 files changed

+163
-205
lines changed

chunk_downloader.py

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111

1212
from .errorcode import (ER_NO_ADDITIONAL_CHUNK, ER_CHUNK_DOWNLOAD_FAILED)
1313
from .errors import (Error, OperationalError)
14-
from .network import (SnowflakeRestful, NO_TOKEN, MAX_CONNECTION_POOL)
15-
from .ssl_wrap_socket import (set_proxies)
1614

1715
DEFAULT_REQUEST_TIMEOUT = 3600
1816
DEFAULT_CLIENT_RESULT_PREFETCH_SLOTS = 2
@@ -126,9 +124,7 @@ def _download_chunk(self, idx):
126124

127125
logger.debug(u"started getting the result set %s: %s",
128126
idx + 1, self._chunks[idx].url)
129-
result_data = self._get_request(
130-
self._chunks[idx].url,
131-
headers, max_connection_pool=self._effective_threads)
127+
result_data = self._fetch_chunk(self._chunks[idx].url, headers)
132128
logger.debug(u"finished getting the result set %s: %s",
133129
idx + 1, self._chunks[idx].url)
134130

@@ -255,35 +251,15 @@ def __del__(self):
255251
# ignore all errors in the destructor
256252
pass
257253

258-
def _get_request(
259-
self, url, headers,
260-
is_raw_binary_iterator=True,
261-
max_connection_pool=MAX_CONNECTION_POOL):
254+
def _fetch_chunk(self, url, headers):
262255
"""
263-
GET request for Large Result set chunkloader
256+
Fetch the chunk from S3.
264257
"""
265-
# sharing the proxy and certificate
266-
proxies = set_proxies(
267-
self._connection.rest._proxy_host,
268-
self._connection.rest._proxy_port,
269-
self._connection.rest._proxy_user,
270-
self._connection.rest._proxy_password)
271-
272-
logger.debug(u'proxies=%s, url=%s', proxies, url)
273-
274-
return SnowflakeRestful.access_url(
275-
self._connection,
276-
self,
277-
u'get',
278-
full_url=url,
279-
headers=headers,
280-
data=None,
281-
proxies=proxies,
282-
timeout=(self._connection._connect_timeout,
283-
self._connection._connect_timeout,
284-
DEFAULT_REQUEST_TIMEOUT),
285-
token=NO_TOKEN,
286-
is_raw_binary=True,
287-
is_raw_binary_iterator=is_raw_binary_iterator,
288-
max_connection_pool=max_connection_pool,
258+
timeouts = (
259+
self._connection._connect_timeout,
260+
self._connection._connect_timeout,
261+
DEFAULT_REQUEST_TIMEOUT
262+
)
263+
return self._connection.rest.fetch(u'get', url, headers,
264+
timeouts=timeouts, is_raw_binary=True, is_raw_binary_iterator=True,
289265
use_ijson=self._use_ijson)

connection.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import logging
3636
from logging import getLogger
3737

38+
3839
# default configs
3940
DEFAULT_CONFIGURATION = {
4041
u'dsn': None, # standard
@@ -72,7 +73,6 @@
7273
u'session_parameters': None, # snowflake internal
7374
u'autocommit': None, # snowflake
7475
u'numpy': False, # snowflake
75-
u'max_connection_pool': network.MAX_CONNECTION_POOL, # snowflake internal
7676
u'ocsp_response_cache_filename': None, # snowflake internal
7777
u'converter_class': SnowflakeConverter, # snowflake internal
7878
u'chunk_downloader_class': SnowflakeChunkDownloader, # snowflake internal
@@ -428,7 +428,6 @@ def __open_connection(self, mfa_callback, password_callback):
428428
connect_timeout=self._connect_timeout,
429429
request_timeout=self._request_timeout,
430430
injectClientPause=self._injectClientPause,
431-
max_connection_pool=self._max_connection_pool,
432431
connection=self)
433432
self.logger.debug(u'REST API object was created: %s:%s, proxy=%s:%s, '
434433
u'proxy_user=%s',

0 commit comments

Comments
 (0)