Skip to content

Commit 63d182e

Browse files
sfc-gh-stakedaankit-bhatnagar167
authored andcommitted
SNOW-105228: Pandas fetch API did not handle the case that first chunk is empty correctly
1 parent 22d19de commit 63d182e

File tree

5 files changed

+358
-19
lines changed

5 files changed

+358
-19
lines changed

arrow_iterator.pyx

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ from libcpp cimport bool as c_bool
1212
from libcpp.memory cimport shared_ptr
1313
from libcpp.string cimport string as c_string
1414
from libcpp.vector cimport vector
15+
from .errors import (Error, OperationalError)
16+
from .errorcode import ER_FAILED_TO_READ_ARROW_STREAM
1517

1618
logger = getLogger(__name__)
1719

@@ -140,15 +142,36 @@ cdef class PyArrowIterator(EmptyPyArrowIterator):
140142
cdef shared_ptr[CRecordBatchReader] reader
141143
cdef shared_ptr[CRecordBatch] record_batch
142144
input_stream.reset(new PyReadableFile(py_inputstream))
143-
CRecordBatchStreamReader.Open(input_stream.get(), &reader)
145+
cdef CStatus ret = CRecordBatchStreamReader.Open(input_stream.get(), &reader)
146+
if not ret.ok():
147+
Error.errorhandler_wrapper(
148+
None,
149+
None,
150+
OperationalError,
151+
{
152+
u'msg': u'Failed to open arrow stream: ' + ret.message(),
153+
u'errno': ER_FAILED_TO_READ_ARROW_STREAM
154+
})
155+
144156
while True:
145-
reader.get().ReadNext(&record_batch)
157+
ret = reader.get().ReadNext(&record_batch)
158+
if not ret.ok():
159+
Error.errorhandler_wrapper(
160+
None,
161+
None,
162+
OperationalError,
163+
{
164+
u'msg': u'Failed to read next arrow batch: ' + ret.message(),
165+
u'errno': ER_FAILED_TO_READ_ARROW_STREAM
166+
})
146167

147168
if record_batch.get() is NULL:
148169
break
149170

150171
self.batches.push_back(record_batch)
151172

173+
logger.debug("Batches read: %d", self.batches.size())
174+
152175
self.context = arrow_context
153176
self.cIterator = NULL
154177
self.unit = ''

arrow_result.pyx

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ cdef class ArrowResult:
3636
object _arrow_context
3737
str _iter_unit
3838

39-
def __init__(self, raw_response, cursor):
39+
def __init__(self, raw_response, cursor, _chunk_downloader=None):
4040
self._reset()
4141
self._cursor = cursor
4242
self._connection = cursor.connection
43-
self._chunk_info(raw_response)
43+
self._chunk_info(raw_response, _chunk_downloader)
4444

45-
def _chunk_info(self, data):
45+
def _chunk_info(self, data, _chunk_downloader=None):
4646
self.total_row_index = -1 # last fetched number of rows
4747

4848
self._chunk_index = 0
@@ -55,6 +55,7 @@ cdef class ArrowResult:
5555
self._arrow_context = ArrowConverterContext(self._connection._session_parameters)
5656
self._current_chunk_row = PyArrowIterator(io.BytesIO(arrow_bytes), self._arrow_context)
5757
else:
58+
logger.debug("Data from first gs response is empty")
5859
self._current_chunk_row = EmptyPyArrowIterator(None, None)
5960
self._iter_unit = EMPTY_UNIT
6061

@@ -76,11 +77,12 @@ cdef class ArrowResult:
7677
header_value)
7778

7879
logger.debug(u'qrmk=%s', qrmk)
79-
self._chunk_downloader = self._connection._chunk_downloader_class(
80-
chunks, self._connection, self._cursor, qrmk, chunk_headers,
81-
query_result_format='arrow',
82-
prefetch_threads=self._connection.client_prefetch_threads,
83-
use_ijson=False)
80+
self._chunk_downloader = _chunk_downloader if _chunk_downloader \
81+
else self._connection._chunk_downloader_class(
82+
chunks, self._connection, self._cursor, qrmk, chunk_headers,
83+
query_result_format='arrow',
84+
prefetch_threads=self._connection.client_prefetch_threads,
85+
use_ijson=False)
8486

8587
def __iter__(self):
8688
return self
@@ -171,9 +173,16 @@ cdef class ArrowResult:
171173
raise RuntimeError
172174

173175
try:
174-
self._current_chunk_row.init(self._iter_unit) # AttributeError if it is iter(())
176+
self._current_chunk_row.init(self._iter_unit)
177+
logger.debug(u'Init table iterator successfully, current chunk index: %s, '
178+
u'chunk count: %s', self._chunk_index, self._chunk_count)
175179
while self._chunk_index <= self._chunk_count:
176-
table = self._current_chunk_row.__next__()
180+
stop_iteration_except = False
181+
try:
182+
table = self._current_chunk_row.__next__()
183+
except StopIteration:
184+
stop_iteration_except = True
185+
177186
if self._chunk_index < self._chunk_count: # multiple chunks
178187
logger.debug(
179188
u"chunk index: %s, chunk_count: %s",
@@ -182,7 +191,11 @@ cdef class ArrowResult:
182191
self._current_chunk_row = next_chunk.result_data
183192
self._current_chunk_row.init(self._iter_unit)
184193
self._chunk_index += 1
185-
yield table
194+
195+
if stop_iteration_except:
196+
continue
197+
else:
198+
yield table
186199
else:
187200
if self._chunk_count > 0 and \
188201
self._chunk_downloader is not None:
@@ -196,9 +209,6 @@ cdef class ArrowResult:
196209
self._chunk_downloader = None
197210
self._chunk_count = 0
198211
self._current_chunk_row = EmptyPyArrowIterator(None, None)
199-
except AttributeError:
200-
# just for handling the case of empty result
201-
return None
202212
finally:
203213
if self._cursor._first_chunk_time:
204214
logger.info("fetching data into pandas dataframe done")

cursor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ def check_can_use_arrow_resultset(self):
652652
}
653653
)
654654

655-
def check_can_use_panadas(self):
655+
def check_can_use_pandas(self):
656656
global pyarrow
657657

658658
if pyarrow is None:
@@ -707,7 +707,7 @@ def fetch_pandas_batches(self, **kwargs):
707707
Fetch a single Arrow Table
708708
@param kwargs: will be passed to pyarrow.Table.to_pandas() method
709709
"""
710-
self.check_can_use_panadas()
710+
self.check_can_use_pandas()
711711
if self._query_result_format != 'arrow': # TODO: or pandas isn't imported
712712
raise NotSupportedError
713713
for df in self._result._fetch_pandas_batches(**kwargs):
@@ -718,7 +718,7 @@ def fetch_pandas_all(self, **kwargs):
718718
Fetch Pandas dataframes in batch, where 'batch' refers to Snowflake Chunk
719719
@param kwargs: will be passed to pyarrow.Table.to_pandas() method
720720
"""
721-
self.check_can_use_panadas()
721+
self.check_can_use_pandas()
722722
if self._query_result_format != 'arrow':
723723
raise NotSupportedError
724724
return self._result._fetch_pandas_all(**kwargs)

errorcode.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,4 @@
7575
ER_NO_PYARROW = 255002
7676
ER_NO_ARROW_RESULT = 255003
7777
ER_NO_PYARROW_SNOWSQL = 255004
78+
ER_FAILED_TO_READ_ARROW_STREAM = 25005

0 commit comments

Comments
 (0)