Skip to content

Commit 1352160

Browse files
sfc-gh-mmishchenkosfc-gh-pczajka
authored andcommitted
SNOW-2028051 introduce a new client_fetch_threads connection parameter to decouple threads number limitations on fetching and pre-fetching (#2255)
1 parent bec3f66 commit 1352160

File tree

3 files changed

+24
-1
lines changed

3 files changed

+24
-1
lines changed

src/snowflake/connector/connection.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120

121121
DEFAULT_CLIENT_PREFETCH_THREADS = 4
122122
MAX_CLIENT_PREFETCH_THREADS = 10
123+
MAX_CLIENT_FETCH_THREADS = 1024
123124
DEFAULT_BACKOFF_POLICY = exponential_backoff()
124125

125126

@@ -222,6 +223,7 @@ def _get_private_bytes_from_file(
222223
(type(None), int),
223224
), # snowflake
224225
"client_prefetch_threads": (4, int), # snowflake
226+
"client_fetch_threads": (None, (type(None), int)),
225227
"numpy": (False, bool), # snowflake
226228
"ocsp_response_cache_filename": (None, (type(None), str)), # snowflake internal
227229
"converter_class": (DefaultConverterClass(), SnowflakeConverter),
@@ -380,6 +382,7 @@ class SnowflakeConnection:
380382
See the backoff_policies module for details and implementation examples.
381383
client_session_keep_alive_heartbeat_frequency: Heartbeat frequency to keep connection alive in seconds.
382384
client_prefetch_threads: Number of threads to download the result set.
385+
client_fetch_threads: Number of threads to fetch staged query results.
383386
rest: Snowflake REST API object. Internal use only. Maybe removed in a later release.
384387
application: Application name to communicate with Snowflake as. By default, this is "PythonConnector".
385388
errorhandler: Handler used with errors. By default, an exception will be raised on error.
@@ -639,6 +642,16 @@ def client_prefetch_threads(self, value) -> None:
639642
self._client_prefetch_threads = value
640643
self._validate_client_prefetch_threads()
641644

645+
@property
646+
def client_fetch_threads(self) -> int | None:
647+
return self._client_fetch_threads
648+
649+
@client_fetch_threads.setter
650+
def client_fetch_threads(self, value: None | int) -> None:
651+
if value is not None:
652+
value = min(max(1, value), MAX_CLIENT_FETCH_THREADS)
653+
self._client_fetch_threads = value
654+
642655
@property
643656
def rest(self) -> SnowflakeRestful | None:
644657
return self._rest

src/snowflake/connector/cursor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,8 @@ def _init_result_and_meta(self, data: dict[Any, Any]) -> None:
11861186
self._result_set = ResultSet(
11871187
self,
11881188
result_chunks,
1189-
self._connection.client_prefetch_threads,
1189+
self._connection.client_fetch_threads
1190+
or self._connection.client_prefetch_threads,
11901191
)
11911192
self._rownumber = -1
11921193
self._result_state = ResultState.VALID

test/integ/test_connection.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,6 +1144,15 @@ def test_client_prefetch_threads_setting(conn_cnx):
11441144
assert conn.client_prefetch_threads == new_thread_count
11451145

11461146

1147+
@pytest.mark.skipolddriver
1148+
def test_client_fetch_threads_setting(conn_cnx):
1149+
"""Tests whether client_fetch_threads is None by default and setting the parameter has effect."""
1150+
with conn_cnx() as conn:
1151+
assert conn.client_fetch_threads is None
1152+
conn.client_fetch_threads = 32
1153+
assert conn.client_fetch_threads == 32
1154+
1155+
11471156
@pytest.mark.external
11481157
def test_client_failover_connection_url(conn_cnx):
11491158
with conn_cnx("client_failover") as conn:

0 commit comments

Comments
 (0)