Skip to content

Commit 94bc145

Browse files
SNOW-2028051 introduce a new client_fetch_threads connection parameter to decouple threads number limitations on fetching and pre-fetching (#2255)
1 parent c4084bf commit 94bc145

File tree

4 files changed

+25
-1
lines changed

4 files changed

+25
-1
lines changed

DESCRIPTION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne
2222
- Lower log levels from info to debug for some of the messages to make the output easier to follow.
2323
- Allow the connector to inherit a UUID4 generated upstream, provided in statement parameters (field: `requestId`), rather than automatically generate a UUID4 to use for the HTTP Request ID.
2424
- Fix expired S3 credentials update and increment retry when expired credentials are found.
25+
- Added `client_fetch_threads` experimental parameter to better utilize threads for fetching query results.
2526

2627
- v3.14.0(March 03, 2025)
2728
- Bumped pyOpenSSL dependency upper boundary from <25.0.0 to <26.0.0.

src/snowflake/connector/connection.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@
125125

126126
DEFAULT_CLIENT_PREFETCH_THREADS = 4
127127
MAX_CLIENT_PREFETCH_THREADS = 10
128+
MAX_CLIENT_FETCH_THREADS = 1024
128129
DEFAULT_BACKOFF_POLICY = exponential_backoff()
129130

130131

@@ -228,6 +229,7 @@ def _get_private_bytes_from_file(
228229
(type(None), int),
229230
), # snowflake
230231
"client_prefetch_threads": (4, int), # snowflake
232+
"client_fetch_threads": (None, (type(None), int)),
231233
"numpy": (False, bool), # snowflake
232234
"ocsp_response_cache_filename": (None, (type(None), str)), # snowflake internal
233235
"converter_class": (DefaultConverterClass(), SnowflakeConverter),
@@ -417,6 +419,7 @@ class SnowflakeConnection:
417419
See the backoff_policies module for details and implementation examples.
418420
client_session_keep_alive_heartbeat_frequency: Heartbeat frequency to keep connection alive in seconds.
419421
client_prefetch_threads: Number of threads to download the result set.
422+
client_fetch_threads: Number of threads to fetch staged query results.
420423
rest: Snowflake REST API object. Internal use only. Maybe removed in a later release.
421424
application: Application name to communicate with Snowflake as. By default, this is "PythonConnector".
422425
errorhandler: Handler used with errors. By default, an exception will be raised on error.
@@ -681,6 +684,16 @@ def client_prefetch_threads(self, value) -> None:
681684
self._client_prefetch_threads = value
682685
self._validate_client_prefetch_threads()
683686

687+
@property
688+
def client_fetch_threads(self) -> int | None:
689+
return self._client_fetch_threads
690+
691+
@client_fetch_threads.setter
692+
def client_fetch_threads(self, value: None | int) -> None:
693+
if value is not None:
694+
value = min(max(1, value), MAX_CLIENT_FETCH_THREADS)
695+
self._client_fetch_threads = value
696+
684697
@property
685698
def rest(self) -> SnowflakeRestful | None:
686699
return self._rest

src/snowflake/connector/cursor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1208,7 +1208,8 @@ def _init_result_and_meta(self, data: dict[Any, Any]) -> None:
12081208
self._result_set = ResultSet(
12091209
self,
12101210
result_chunks,
1211-
self._connection.client_prefetch_threads,
1211+
self._connection.client_fetch_threads
1212+
or self._connection.client_prefetch_threads,
12121213
)
12131214
self._rownumber = -1
12141215
self._result_state = ResultState.VALID

test/integ/test_connection.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,6 +1141,15 @@ def test_client_prefetch_threads_setting(conn_cnx):
11411141
assert conn.client_prefetch_threads == new_thread_count
11421142

11431143

1144+
@pytest.mark.skipolddriver
1145+
def test_client_fetch_threads_setting(conn_cnx):
1146+
"""Tests whether client_fetch_threads is None by default and setting the parameter has effect."""
1147+
with conn_cnx() as conn:
1148+
assert conn.client_fetch_threads is None
1149+
conn.client_fetch_threads = 32
1150+
assert conn.client_fetch_threads == 32
1151+
1152+
11441153
@pytest.mark.external
11451154
def test_client_failover_connection_url(conn_cnx):
11461155
with conn_cnx("client_failover") as conn:

0 commit comments

Comments
 (0)