Skip to content

Commit bb3dc64

Browse files
committed
SNOW-1654538: asyncio download timeout setting (#2063)
1 parent 5f8f235 commit bb3dc64

File tree

2 files changed

+58
-62
lines changed

2 files changed

+58
-62
lines changed

src/snowflake/connector/aio/_result_batch.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,7 @@
2929
get_http_retryable_error,
3030
is_retryable_http_code,
3131
)
32-
from snowflake.connector.result_batch import (
33-
MAX_DOWNLOAD_RETRY,
34-
SSE_C_AES,
35-
SSE_C_ALGORITHM,
36-
SSE_C_KEY,
37-
)
32+
from snowflake.connector.result_batch import SSE_C_AES, SSE_C_ALGORITHM, SSE_C_KEY
3833
from snowflake.connector.result_batch import ArrowResultBatch as ArrowResultBatchSync
3934
from snowflake.connector.result_batch import DownloadMetrics
4035
from snowflake.connector.result_batch import JSONResultBatch as JSONResultBatchSync
@@ -52,8 +47,13 @@
5247

5348
logger = getLogger(__name__)
5449

50+
# we redefine the DOWNLOAD_TIMEOUT and MAX_DOWNLOAD_RETRY for async version on purpose
51+
# because download in sync and async are different in nature and may require separate tuning
52+
# also be aware that currently _result_batch is a private module so these values are not exposed to users directly
53+
DOWNLOAD_TIMEOUT = None
54+
MAX_DOWNLOAD_RETRY = 10
55+
5556

56-
# TODO: consolidate this with the sync version
5757
def create_batches_from_response(
5858
cursor: SnowflakeCursor,
5959
_format: str,
@@ -212,19 +212,27 @@ async def download_chunk(http_session):
212212
return response, content, encoding
213213

214214
content, encoding = None, None
215-
for retry in range(MAX_DOWNLOAD_RETRY):
215+
for retry in range(max(MAX_DOWNLOAD_RETRY, 1)):
216216
try:
217-
# TODO: feature parity with download timeout setting, in sync it's set to 7s
218-
# but in async we schedule multiple tasks at the same time so some tasks might
219-
# take longer than 7s to finish which is expected
217+
220218
async with TimerContextManager() as download_metric:
221219
logger.debug(f"started downloading result batch id: {self.id}")
222220
chunk_url = self._remote_chunk_info.url
223221
request_data = {
224222
"url": chunk_url,
225223
"headers": self._chunk_headers,
226-
# "timeout": DOWNLOAD_TIMEOUT,
227224
}
225+
# timeout setting for download is different from the sync version which has an
226+
# empirical value 7 seconds. It is difficult to measure this empirical value in async
227+
# as we maximize the network throughput by downloading multiple chunks at the same time compared
228+
# to the sync version that the overall throughput is constrained by the number of
229+
# prefetch threads -- in asyncio we see great download performance improvement.
230+
# if DOWNLOAD_TIMEOUT is not set, by default the aiohttp session timeout comes into effect
231+
# which originates from the connection config.
232+
if DOWNLOAD_TIMEOUT:
233+
request_data["timeout"] = aiohttp.ClientTimeout(
234+
total=DOWNLOAD_TIMEOUT
235+
)
228236
# Try to reuse a connection if possible
229237
if connection and connection._rest is not None:
230238
async with connection._rest._use_requests_session() as session:

test/integ/aio/test_cursor_async.py

Lines changed: 38 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
import pickle
1414
import time
1515
from datetime import date, datetime, timezone
16-
from typing import TYPE_CHECKING, NamedTuple
1716
from unittest import mock
1817

1918
import pytest
2019
import pytz
2120

2221
import snowflake.connector
22+
import snowflake.connector.aio
2323
from snowflake.connector import (
2424
InterfaceError,
2525
NotSupportedError,
@@ -30,64 +30,31 @@
3030
errors,
3131
)
3232
from snowflake.connector.aio import DictCursor, SnowflakeCursor
33+
from snowflake.connector.aio._result_batch import (
34+
ArrowResultBatch,
35+
JSONResultBatch,
36+
ResultBatch,
37+
)
3338
from snowflake.connector.compat import IS_WINDOWS
34-
35-
try:
36-
from snowflake.connector.cursor import ResultMetadata
37-
except ImportError:
38-
39-
class ResultMetadata(NamedTuple):
40-
name: str
41-
type_code: int
42-
display_size: int
43-
internal_size: int
44-
precision: int
45-
scale: int
46-
is_nullable: bool
47-
48-
49-
import snowflake.connector.aio
39+
from snowflake.connector.constants import (
40+
FIELD_ID_TO_NAME,
41+
PARAMETER_MULTI_STATEMENT_COUNT,
42+
PARAMETER_PYTHON_CONNECTOR_QUERY_RESULT_FORMAT,
43+
QueryStatus,
44+
)
45+
from snowflake.connector.cursor import ResultMetadata
5046
from snowflake.connector.description import CLIENT_VERSION
5147
from snowflake.connector.errorcode import (
5248
ER_FAILED_TO_REWRITE_MULTI_ROW_INSERT,
49+
ER_NO_ARROW_RESULT,
50+
ER_NO_PYARROW,
51+
ER_NO_PYARROW_SNOWSQL,
5352
ER_NOT_POSITIVE_SIZE,
5453
)
5554
from snowflake.connector.errors import Error
5655
from snowflake.connector.sqlstate import SQLSTATE_FEATURE_NOT_SUPPORTED
5756
from snowflake.connector.telemetry import TelemetryField
58-
59-
try:
60-
from snowflake.connector.util_text import random_string
61-
except ImportError:
62-
from ..randomize import random_string
63-
64-
try:
65-
from snowflake.connector.aio._result_batch import ArrowResultBatch, JSONResultBatch
66-
from snowflake.connector.constants import (
67-
FIELD_ID_TO_NAME,
68-
PARAMETER_MULTI_STATEMENT_COUNT,
69-
PARAMETER_PYTHON_CONNECTOR_QUERY_RESULT_FORMAT,
70-
)
71-
from snowflake.connector.errorcode import (
72-
ER_NO_ARROW_RESULT,
73-
ER_NO_PYARROW,
74-
ER_NO_PYARROW_SNOWSQL,
75-
)
76-
except ImportError:
77-
PARAMETER_PYTHON_CONNECTOR_QUERY_RESULT_FORMAT = None
78-
ER_NO_ARROW_RESULT = None
79-
ER_NO_PYARROW = None
80-
ER_NO_PYARROW_SNOWSQL = None
81-
ArrowResultBatch = JSONResultBatch = None
82-
FIELD_ID_TO_NAME = {}
83-
84-
if TYPE_CHECKING: # pragma: no cover
85-
from snowflake.connector.result_batch import ResultBatch
86-
87-
try: # pragma: no cover
88-
from snowflake.connector.constants import QueryStatus
89-
except ImportError:
90-
QueryStatus = None
57+
from snowflake.connector.util_text import random_string
9158

9259

9360
@pytest.fixture
@@ -1824,3 +1791,24 @@ async def test_decoding_utf8_for_json_result(conn_cnx):
18241791
)
18251792
with pytest.raises(Error):
18261793
await result_batch._load("À".encode("latin1"), "latin1")
1794+
1795+
1796+
async def test_fetch_download_timeout_setting(conn_cnx):
1797+
with mock.patch.multiple(
1798+
"snowflake.connector.aio._result_batch",
1799+
DOWNLOAD_TIMEOUT=0.001,
1800+
MAX_DOWNLOAD_RETRY=2,
1801+
):
1802+
sql = "SELECT seq4(), uniform(1, 10, RANDOM(12)) FROM TABLE(GENERATOR(ROWCOUNT => 100000)) v"
1803+
async with conn_cnx() as con, con.cursor() as cur:
1804+
with pytest.raises(asyncio.TimeoutError):
1805+
await (await cur.execute(sql)).fetchall()
1806+
1807+
with mock.patch.multiple(
1808+
"snowflake.connector.aio._result_batch",
1809+
DOWNLOAD_TIMEOUT=10,
1810+
MAX_DOWNLOAD_RETRY=1,
1811+
):
1812+
sql = "SELECT seq4(), uniform(1, 10, RANDOM(12)) FROM TABLE(GENERATOR(ROWCOUNT => 100000)) v"
1813+
async with conn_cnx() as con, con.cursor() as cur:
1814+
assert len(await (await cur.execute(sql)).fetchall()) == 100000

0 commit comments

Comments
 (0)