Skip to content

Commit 7cd4413

Browse files
committed
SNOW-27216: pluggable Chunk Loader for Python Connector.
1 parent fb04b88 commit 7cd4413

File tree

4 files changed

+85
-64
lines changed

4 files changed

+85
-64
lines changed

chunk_downloader.py

Lines changed: 64 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from .errorcode import (ER_NO_ADDITIONAL_CHUNK, ER_CHUNK_DOWNLOAD_FAILED)
1313
from .errors import (Error, OperationalError)
14-
from .network import (SnowflakeRestful, NO_TOKEN)
14+
from .network import (SnowflakeRestful, NO_TOKEN, MAX_CONNECTION_POOL)
1515

1616
DEFAULT_REQUEST_TIMEOUT = 300
1717
DEFAULT_CLIENT_RESULT_PREFETCH_SLOTS = 2
@@ -32,17 +32,18 @@
3232
'ready' # True if ready to consume or False
3333
])
3434

35+
logger = getLogger(__name__)
36+
3537

3638
class SnowflakeChunkDownloader(object):
3739
u"""
3840
Large Result set chunk downloader class.
3941
"""
4042

41-
def __init__(self, chunks, connection, cursor, qrmk, chunk_headers,
42-
prefetch_slots=DEFAULT_CLIENT_RESULT_PREFETCH_SLOTS,
43-
prefetch_threads=DEFAULT_CLIENT_RESULT_PREFETCH_THREADS,
44-
use_ijson=False):
45-
self.logger = getLogger(__name__)
43+
def _pre_init(self, chunks, connection, cursor, qrmk, chunk_headers,
44+
prefetch_slots=DEFAULT_CLIENT_RESULT_PREFETCH_SLOTS,
45+
prefetch_threads=DEFAULT_CLIENT_RESULT_PREFETCH_THREADS,
46+
use_ijson=False):
4647
self._use_ijson = use_ijson
4748
self._session = None
4849

@@ -70,22 +71,22 @@ def __init__(self, chunks, connection, cursor, qrmk, chunk_headers,
7071
self._chunk_size)
7172

7273
for idx, chunk in enumerate(chunks):
73-
self.logger.info(u"queued chunk: url=%s, rowCount=%s",
74-
chunk[u'url'], chunk[u'rowCount'])
74+
logger.info(u"queued chunk: url=%s, rowCount=%s",
75+
chunk[u'url'], chunk[u'rowCount'])
7576
self._chunks[idx] = SnowflakeChunk(
7677
url=chunk[u'url'],
7778
result_data=None,
7879
ready=False,
7980
row_count=int(chunk[u'rowCount']))
8081

81-
self.logger.debug(u'prefetch slots: %s, '
82-
u'prefetch threads: %s, '
83-
u'number of chunks: %s, '
84-
u'effective threads: %s',
85-
self._prefetch_slots,
86-
self._prefetch_threads,
87-
self._chunk_size,
88-
self._effective_threads)
82+
logger.debug(u'prefetch slots: %s, '
83+
u'prefetch threads: %s, '
84+
u'number of chunks: %s, '
85+
u'effective threads: %s',
86+
self._prefetch_slots,
87+
self._prefetch_threads,
88+
self._chunk_size,
89+
self._effective_threads)
8990

9091
self._pool = ThreadPool(self._effective_threads)
9192

@@ -94,6 +95,15 @@ def __init__(self, chunks, connection, cursor, qrmk, chunk_headers,
9495

9596
self._next_chunk_to_consume = 0
9697

98+
def __init__(self, chunks, connection, cursor, qrmk, chunk_headers,
99+
prefetch_slots=DEFAULT_CLIENT_RESULT_PREFETCH_SLOTS,
100+
prefetch_threads=DEFAULT_CLIENT_RESULT_PREFETCH_THREADS,
101+
use_ijson=False):
102+
self._pre_init(chunks, connection, cursor, qrmk, chunk_headers,
103+
prefetch_slots=prefetch_slots,
104+
prefetch_threads=prefetch_threads,
105+
use_ijson=use_ijson)
106+
logger.info('Chunk Downloader in memory')
97107
for idx in range(self._num_chunks_to_prefetch):
98108
self._pool.apply_async(self._download_chunk, [idx])
99109
self._chunk_locks[idx] = Condition()
@@ -103,47 +113,48 @@ def _download_chunk(self, idx):
103113
"""
104114
Downloads a chunk asynchronously
105115
"""
106-
self.logger.debug(u'downloading chunk %s/%s', idx, self._chunk_size)
116+
logger.debug(u'downloading chunk %s/%s', idx + 1, self._chunk_size)
107117
headers = {}
108118
try:
109119
if self._chunk_headers is not None:
110120
headers = self._chunk_headers
111-
self.logger.debug(u'use chunk headers from result')
121+
logger.debug(u'use chunk headers from result')
112122
elif self._qrmk is not None:
113123
headers[SSE_C_ALGORITHM] = SSE_C_AES
114124
headers[SSE_C_KEY] = self._qrmk
115125

116-
self.logger.debug(u"started getting the result set %s: %s",
117-
idx + 1, self._chunks[idx].url)
126+
logger.debug(u"started getting the result set %s: %s",
127+
idx + 1, self._chunks[idx].url)
118128
result_data = self._get_request(
119129
self._chunks[idx].url,
120-
headers)
121-
self.logger.debug(u"finished getting the result set %s: %s",
122-
idx + 1, self._chunks[idx].url)
130+
headers, max_connection_pool=self._effective_threads)
131+
logger.debug(u"finished getting the result set %s: %s",
132+
idx + 1, self._chunks[idx].url)
123133

124134
with self._chunk_locks[idx]:
125135
self._chunks[idx] = self._chunks[idx]._replace(
126136
result_data=result_data,
127137
ready=True)
128138
self._chunk_locks[idx].notify()
129-
self.logger.debug(
139+
logger.debug(
130140
u'added chunk %s/%s to a chunk list.', idx + 1,
131141
self._chunk_size)
132142
except Exception as e:
133-
self.logger.exception(
134-
u'Failed to fetch the large result set chunk')
143+
logger.exception(
144+
u'Failed to fetch the large result set chunk %s/%s',
145+
idx + 1, self._chunk_size)
135146
self._downloader_error = e
136147

137148
def next_chunk(self):
138149
"""
139150
Gets the next chunk if ready
140151
"""
141-
self.logger.debug(
152+
logger.debug(
142153
u'next_chunk_to_consume={next_chunk_to_consume}, '
143154
u'next_chunk_to_download={next_chunk_to_download}, '
144155
u'total_chunks={total_chunks}'.format(
145-
next_chunk_to_consume=self._next_chunk_to_consume,
146-
next_chunk_to_download=self._next_chunk_to_download,
156+
next_chunk_to_consume=self._next_chunk_to_consume + 1,
157+
next_chunk_to_download=self._next_chunk_to_download + 1,
147158
total_chunks=self._chunk_size))
148159
if self._next_chunk_to_consume > 0:
149160
# clean up the previously fetched data and lock
@@ -169,12 +180,12 @@ def next_chunk(self):
169180
raise self._downloader_error
170181

171182
for attempt in range(MAX_RETRY_DOWNLOAD):
172-
self.logger.debug(u'waiting for chunk %s/%s'
173-
u' in %s/%s download attempt',
174-
self._next_chunk_to_consume + 1,
175-
self._chunk_size,
176-
attempt + 1,
177-
MAX_RETRY_DOWNLOAD)
183+
logger.debug(u'waiting for chunk %s/%s'
184+
u' in %s/%s download attempt',
185+
self._next_chunk_to_consume + 1,
186+
self._chunk_size,
187+
attempt + 1,
188+
MAX_RETRY_DOWNLOAD)
178189
done = False
179190
for wait_counter in range(MAX_WAIT):
180191
with self._chunk_locks[self._next_chunk_to_consume]:
@@ -184,16 +195,16 @@ def next_chunk(self):
184195
self._downloader_error is not None:
185196
done = True
186197
break
187-
self.logger.debug(u'chunk %s/%s is NOT ready to consume'
188-
u' in %s/%s(s)',
189-
self._next_chunk_to_consume + 1,
190-
self._chunk_size,
191-
(wait_counter + 1) * WAIT_TIME_IN_SECONDS,
192-
MAX_WAIT * WAIT_TIME_IN_SECONDS)
198+
logger.debug(u'chunk %s/%s is NOT ready to consume'
199+
u' in %s/%s(s)',
200+
self._next_chunk_to_consume + 1,
201+
self._chunk_size,
202+
(wait_counter + 1) * WAIT_TIME_IN_SECONDS,
203+
MAX_WAIT * WAIT_TIME_IN_SECONDS)
193204
self._chunk_locks[self._next_chunk_to_consume].wait(
194205
WAIT_TIME_IN_SECONDS)
195206
else:
196-
self.logger.debug(
207+
logger.debug(
197208
u'chunk %s/%s is still NOT ready. Restarting chunk '
198209
u'downloader threads',
199210
self._next_chunk_to_consume + 1,
@@ -216,9 +227,9 @@ def next_chunk(self):
216227
u'unknown reason.',
217228
u'errno': ER_CHUNK_DOWNLOAD_FAILED
218229
})
219-
self.logger.debug(u'chunk %s/%s is ready to consume',
220-
self._next_chunk_to_consume + 1,
221-
self._chunk_size)
230+
logger.debug(u'chunk %s/%s is ready to consume',
231+
self._next_chunk_to_consume + 1,
232+
self._chunk_size)
222233

223234
ret = self._chunks[self._next_chunk_to_consume]
224235
self._next_chunk_to_consume += 1
@@ -243,7 +254,11 @@ def __del__(self):
243254
# ignore all errors in the destructor
244255
pass
245256

246-
def _get_request(self, url, headers, retry=10):
257+
def _get_request(
258+
self, url, headers,
259+
is_raw_binary_iterator=True,
260+
max_connection_pool=MAX_CONNECTION_POOL,
261+
retry=10):
247262
"""
248263
GET request for Large Result set chunkloader
249264
"""
@@ -254,7 +269,7 @@ def _get_request(self, url, headers, retry=10):
254269
self._connection.rest._proxy_user,
255270
self._connection.rest._proxy_password)
256271

257-
self.logger.debug(u'proxies=%s, url=%s', proxies, url)
272+
logger.debug(u'proxies=%s, url=%s', proxies, url)
258273

259274
return SnowflakeRestful.access_url(
260275
self._connection,
@@ -270,4 +285,6 @@ def _get_request(self, url, headers, retry=10):
270285
retry=retry,
271286
token=NO_TOKEN,
272287
is_raw_binary=True,
288+
is_raw_binary_iterator=is_raw_binary_iterator,
289+
max_connection_pool=max_connection_pool,
273290
use_ijson=self._use_ijson)

connection.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from . import errors
1414
from . import network
15+
from .chunk_downloader import SnowflakeChunkDownloader
1516
from .compat import (TO_UNICODE, IS_OLD_PYTHON, urlencode, PY2)
1617
from .converter import SnowflakeConverter
1718
from .cursor import SnowflakeCursor
@@ -41,38 +42,38 @@
4142
u'password': u'', # standard
4243
u'host': u'127.0.0.1', # standard
4344
u'port': 8080, # standard
45+
u'database': None, # standard
4446
u'proxy_host': None, # snowflake
4547
u'proxy_port': None, # snowflake
4648
u'proxy_user': None, # snowflake
4749
u'proxy_password': None, # snowflake
48-
u'database': None, # standard
49-
u'protocol': u'http', # support http/https
50+
u'protocol': u'http', # snowflake
5051
u'warehouse': None, # snowflake
5152
u'region': None, # snowflake
5253
u'account': None, # snowflake
5354
u'schema': None, # snowflake
5455
u'role': None, # snowflake
55-
u'session_id': None,
56+
u'session_id': None, # snowflake
5657
u'connect_timeout': None, # connection timeout
5758
u'request_timeout': None, # request timeout
5859
u'passcode_in_password': False, # Snowflake MFA
5960
u'passcode': None, # Snowflake MFA
6061
u'authenticator': network.DEFAULT_AUTHENTICATOR,
6162
u'mfa_callback': None,
6263
u'password_callback': None,
63-
# Snowflake Federated Authentication
6464
u'application': network.CLIENT_NAME,
6565
u'internal_application_name': network.CLIENT_NAME,
6666
u'internal_application_version': network.CLIENT_VERSION,
6767

6868
u'insecure_mode': False, # Error security fix requirement
6969
u'injectClientPause': 0, # snowflake internal
70-
u'session_parameters': None,
71-
u'autocommit': None,
72-
u'numpy': False,
73-
u'max_connection_pool': network.MAX_CONNECTION_POOL,
74-
u'ocsp_response_cache_filename': None,
75-
u'converter_class': SnowflakeConverter,
70+
u'session_parameters': None, # snowflake internal
71+
u'autocommit': None, # snowflake
72+
u'numpy': False, # snowflake
73+
u'max_connection_pool': network.MAX_CONNECTION_POOL, # snowflake internal
74+
u'ocsp_response_cache_filename': None, # snowflake internal
75+
u'converter_class': SnowflakeConverter, # snowflake internal
76+
u'chunk_downloader_class': SnowflakeChunkDownloader, # snowflake internal
7677
}
7778

7879
APPLICATION_RE = re.compile(r'[\w\d_]+')

cursor.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@
1313

1414
from six import u
1515

16-
from .chunk_downloader import (SnowflakeChunkDownloader,
17-
DEFAULT_CLIENT_RESULT_PREFETCH_SLOTS,
16+
from .chunk_downloader import (DEFAULT_CLIENT_RESULT_PREFETCH_SLOTS,
1817
DEFAULT_CLIENT_RESULT_PREFETCH_THREADS)
1918
from .compat import (BASE_EXCEPTION_CLASS)
2019
from .constants import (FIELD_NAME_TO_ID, FIELD_ID_TO_NAME)
@@ -541,7 +540,7 @@ def chunk_info(self, data, use_ijson=False):
541540
header_value)
542541

543542
self.logger.debug(u'qrmk=%s', qrmk)
544-
self._chunk_downloader = SnowflakeChunkDownloader(
543+
self._chunk_downloader = self._connection._chunk_downloader_class(
545544
chunks, self._connection, self, qrmk, chunk_headers,
546545
prefetch_slots=self._client_result_prefetch_slots,
547546
prefetch_threads=self._client_result_prefetch_threads,

network.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,7 @@ def access_url(conn, session_context, method, full_url, headers, data,
609609
is_raw_text=False,
610610
catch_okta_unauthorized_error=False,
611611
is_raw_binary=False,
612+
is_raw_binary_iterator=True,
612613
max_connection_pool=MAX_CONNECTION_POOL,
613614
use_ijson=False):
614615
logger = getLogger(__name__)
@@ -671,7 +672,9 @@ def request_thread(result_queue):
671672
raw_data = decompress_raw_data(
672673
raw_ret.raw, add_bracket=True
673674
).decode('utf-8', 'replace')
674-
if not use_ijson:
675+
if not is_raw_binary_iterator:
676+
ret = json.loads(raw_data)
677+
elif not use_ijson:
675678
ret = iter(json.loads(raw_data))
676679
else:
677680
ret = split_rows_from_stream(StringIO(raw_data))
@@ -744,15 +747,15 @@ def request_thread(result_queue):
744747
request_result_queue = Queue()
745748
request_thread(request_result_queue)
746749
try:
747-
# don't care about the return value, because no rety and
750+
# don't care about the return value, because no retry and
748751
# no error will show up
749752
_, _ = request_result_queue.get(timeout=request_timeout)
750753
except:
751754
pass
752755

753756
sleeping_time = 1
757+
return_object = None
754758
for retry_cnt in range(retry):
755-
return_object = None
756759
request_result_queue = Queue()
757760
th = Thread(name='request_thread', target=request_thread,
758761
args=(request_result_queue,))
@@ -764,7 +767,7 @@ def request_thread(result_queue):
764767
th.join(timeout=request_timeout)
765768
logger.debug('request thread joined')
766769
return_object, retryable = request_result_queue.get(
767-
timeout=request_timeout)
770+
timeout=int(request_timeout / 2))
768771
logger.debug('request thread returned object')
769772
if retryable:
770773
raise RequestRetry()
@@ -797,6 +800,7 @@ def request_thread(result_queue):
797800
retry,
798801
sleeping_time)
799802
time.sleep(sleeping_time)
803+
return_object = None
800804

801805
return return_object
802806

0 commit comments

Comments
 (0)