Skip to content

Commit 3f1ed6e

Browse files
sfc-gh-stakedaankit-bhatnagar167
authored andcommitted
SNOW-119349: handle year out of range in arrow format
1 parent 45bb067 commit 3f1ed6e

11 files changed

+92
-46
lines changed

arrow_iterator.pyx

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ from libcpp cimport bool as c_bool
1212
from libcpp.memory cimport shared_ptr
1313
from libcpp.string cimport string as c_string
1414
from libcpp.vector cimport vector
15-
from .errors import (Error, OperationalError)
16-
from .errorcode import ER_FAILED_TO_READ_ARROW_STREAM
15+
from .errors import (Error, OperationalError, InterfaceError)
16+
from .errorcode import (ER_FAILED_TO_READ_ARROW_STREAM, ER_FAILED_TO_CONVERT_ROW_TO_PYTHON_TYPE)
1717

1818
logger = getLogger(__name__)
1919

@@ -27,8 +27,13 @@ ROW_UNIT, TABLE_UNIT, EMPTY_UNIT = 'row', 'table', ''
2727

2828

2929
cdef extern from "cpp/ArrowIterator/CArrowIterator.hpp" namespace "sf":
30+
cdef cppclass ReturnVal:
31+
PyObject * successObj;
32+
33+
PyObject * exception;
34+
3035
cdef cppclass CArrowIterator:
31-
PyObject* next();
36+
shared_ptr[ReturnVal] next();
3237

3338

3439
cdef extern from "cpp/ArrowIterator/CArrowChunkIterator.hpp" namespace "sf":
@@ -132,20 +137,21 @@ cdef class PyArrowIterator(EmptyPyArrowIterator):
132137
cdef object context
133138
cdef CArrowIterator* cIterator
134139
cdef str unit
135-
cdef PyObject* cret
140+
cdef shared_ptr[ReturnVal] cret
136141
cdef vector[shared_ptr[CRecordBatch]] batches
137142
cdef object use_dict_result
143+
cdef object cursor
138144

139-
def __cinit__(self, object py_inputstream, object arrow_context, object use_dict_result):
145+
def __cinit__(self, object cursor, object py_inputstream, object arrow_context, object use_dict_result):
140146
cdef shared_ptr[InputStream] input_stream
141147
cdef shared_ptr[CRecordBatchReader] reader
142148
cdef shared_ptr[CRecordBatch] record_batch
143149
input_stream.reset(new PyReadableFile(py_inputstream))
144150
cdef CStatus ret = CRecordBatchStreamReader.Open(input_stream.get(), &reader)
145151
if not ret.ok():
146152
Error.errorhandler_wrapper(
147-
None,
148-
None,
153+
cursor.connection,
154+
cursor,
149155
OperationalError,
150156
{
151157
u'msg': u'Failed to open arrow stream: ' + ret.message(),
@@ -156,8 +162,8 @@ cdef class PyArrowIterator(EmptyPyArrowIterator):
156162
ret = reader.get().ReadNext(&record_batch)
157163
if not ret.ok():
158164
Error.errorhandler_wrapper(
159-
None,
160-
None,
165+
cursor.connection,
166+
cursor,
161167
OperationalError,
162168
{
163169
u'msg': u'Failed to read next arrow batch: ' + ret.message(),
@@ -175,18 +181,24 @@ cdef class PyArrowIterator(EmptyPyArrowIterator):
175181
self.cIterator = NULL
176182
self.unit = ''
177183
self.use_dict_result = use_dict_result
184+
self.cursor = cursor
178185

179186
def __dealloc__(self):
180187
del self.cIterator
181188

182189
def __next__(self):
183190
self.cret = self.cIterator.next()
184191

185-
if not self.cret:
186-
logger.error("Internal error from CArrowIterator\n")
192+
if not self.cret.get().successObj:
193+
msg = u'Failed to convert current row, cause: ' + str(<object>self.cret.get().exception)
194+
Error.errorhandler_wrapper(self.cursor.connection, self.cursor, InterfaceError,
195+
{
196+
u'msg': msg,
197+
u'errno': ER_FAILED_TO_CONVERT_ROW_TO_PYTHON_TYPE
198+
})
187199
# it looks like this line can help us get into python and detect the global variable immediately
188200
# however, this log will not show up for unclear reason
189-
ret = <object>self.cret
201+
ret = <object>self.cret.get().successObj
190202

191203
if ret is None:
192204
raise StopIteration

arrow_result.pyx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
# cython: language_level=3
77

88
from base64 import b64decode
9-
from libcpp cimport bool
109
import io
1110
from logging import getLogger
1211
from .telemetry import TelemetryField
@@ -57,7 +56,8 @@ cdef class ArrowResult:
5756
if rowset_b64:
5857
arrow_bytes = b64decode(rowset_b64)
5958
self._arrow_context = ArrowConverterContext(self._connection._session_parameters)
60-
self._current_chunk_row = PyArrowIterator(io.BytesIO(arrow_bytes), self._arrow_context, self._use_dict_result)
59+
self._current_chunk_row = PyArrowIterator(self._cursor, io.BytesIO(arrow_bytes),
60+
self._arrow_context, self._use_dict_result)
6161
else:
6262
logger.debug("Data from first gs response is empty")
6363
self._current_chunk_row = EmptyPyArrowIterator()

chunk_downloader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,5 +326,5 @@ def __init__(self, cursor, connection):
326326
def to_iterator(self, raw_data_fd, download_time):
327327
from .arrow_iterator import PyArrowIterator
328328
gzip_decoder = GzipFile(fileobj=raw_data_fd, mode='r')
329-
it = PyArrowIterator(gzip_decoder, self._arrow_context, self._cursor._use_dict_result)
329+
it = PyArrowIterator(self._cursor, gzip_decoder, self._arrow_context, self._cursor._use_dict_result)
330330
return it

cpp/ArrowIterator/CArrowChunkIterator.cpp

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,21 @@
1414
#include "TimeConverter.hpp"
1515
#include <string>
1616

17+
#define SF_CHECK_PYTHON_ERR() \
18+
if (py::checkPyError())\
19+
{\
20+
PyObject *type, * val, *traceback;\
21+
PyErr_Fetch(&type, &val, &traceback);\
22+
PyErr_Clear();\
23+
m_currentPyException.reset(val);\
24+
\
25+
Py_XDECREF(type);\
26+
Py_XDECREF(traceback);\
27+
\
28+
return std::make_shared<ReturnVal>(nullptr, m_currentPyException.get());\
29+
}
30+
31+
1732
namespace sf
1833
{
1934

@@ -31,18 +46,15 @@ CArrowChunkIterator::CArrowChunkIterator(PyObject* context, std::vector<std::sha
3146
m_columnCount);
3247
}
3348

34-
PyObject* CArrowChunkIterator::next()
49+
std::shared_ptr<ReturnVal> CArrowChunkIterator::next()
3550
{
3651
m_rowIndexInBatch++;
3752

3853
if (m_rowIndexInBatch < m_rowCountInBatch)
3954
{
4055
this->createRowPyObject();
41-
if (py::checkPyError())
42-
{
43-
return nullptr;
44-
}
45-
return m_latestReturnedRow.get();
56+
SF_CHECK_PYTHON_ERR()
57+
return std::make_shared<ReturnVal>(m_latestReturnedRow.get(), nullptr);
4658
}
4759
else
4860
{
@@ -52,26 +64,21 @@ PyObject* CArrowChunkIterator::next()
5264
m_rowIndexInBatch = 0;
5365
m_rowCountInBatch = (*m_cRecordBatches)[m_currentBatchIndex]->num_rows();
5466
this->initColumnConverters();
55-
if (py::checkPyError())
56-
{
57-
return nullptr;
58-
}
67+
SF_CHECK_PYTHON_ERR()
5968

6069
logger.debug("Current batch index: %d, rows in current batch: %d",
6170
m_currentBatchIndex, m_rowCountInBatch);
6271

6372
this->createRowPyObject();
64-
if (py::checkPyError())
65-
{
66-
return nullptr;
67-
}
68-
return m_latestReturnedRow.get();
73+
SF_CHECK_PYTHON_ERR()
74+
75+
return std::make_shared<ReturnVal>(m_latestReturnedRow.get(), nullptr);
6976
}
7077
}
7178

7279
/** It looks like no one will decrease the ref of this Py_None, so we don't
7380
* increament the ref count here */
74-
return Py_None;
81+
return std::make_shared<ReturnVal>(Py_None, nullptr);
7582
}
7683

7784
void CArrowChunkIterator::createRowPyObject()

cpp/ArrowIterator/CArrowChunkIterator.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class CArrowChunkIterator : public CArrowIterator
3232
/**
3333
* @return a python tuple object which contains all data in current row
3434
*/
35-
PyObject* next() override;
35+
std::shared_ptr<ReturnVal> next() override;
3636

3737
protected:
3838
/**
@@ -65,6 +65,9 @@ class CArrowChunkIterator : public CArrowIterator
6565
/** total number of rows inside current record batch */
6666
int64_t m_rowCountInBatch;
6767

68+
/** pointer to the current python exception object */
69+
py::UniqueRef m_currentPyException;
70+
6871
/** arrow format convert context for the current session */
6972
PyObject* m_context;
7073

cpp/ArrowIterator/CArrowIterator.hpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@
3232
namespace sf
3333
{
3434

35+
/**
36+
* A simple struct to contain return data back cython.
37+
* PyObject would be nullptr if failed and cause string will be populated
38+
*/
39+
class ReturnVal
40+
{
41+
public:
42+
ReturnVal(PyObject * obj, PyObject *except) :
43+
successObj(obj), exception(except)
44+
{
45+
}
46+
47+
PyObject * successObj;
48+
49+
PyObject * exception;
50+
};
51+
3552
/**
3653
* Arrow base iterator implementation in C++.
3754
*/
@@ -46,7 +63,7 @@ class CArrowIterator
4663
/**
4764
* @return a python object which might be current row or an Arrow Table
4865
*/
49-
virtual PyObject* next() = 0;
66+
virtual std::shared_ptr<ReturnVal> next() = 0;
5067

5168
protected:
5269
/** list of all record batch in current chunk */

cpp/ArrowIterator/CArrowTableIterator.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,17 +137,17 @@ CArrowTableIterator::CArrowTableIterator(PyObject* context, std::vector<std::sha
137137
Py_XDECREF(tz);
138138
}
139139

140-
PyObject* CArrowTableIterator::next()
140+
std::shared_ptr<ReturnVal> CArrowTableIterator::next()
141141
{
142142
bool firstDone = this->convertRecordBatchesToTable();
143143
if (firstDone && m_cTable)
144144
{
145145
m_pyTableObjRef.reset(arrow::py::wrap_table(m_cTable));
146-
return m_pyTableObjRef.get();
146+
return std::make_shared<ReturnVal>(m_pyTableObjRef.get(), nullptr);
147147
}
148148
else
149149
{
150-
return Py_None;
150+
return std::make_shared<ReturnVal>(Py_None, nullptr);
151151
}
152152
}
153153

cpp/ArrowIterator/CArrowTableIterator.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class CArrowTableIterator : public CArrowIterator
3232
/**
3333
* @return an arrow table containing all data in all record batches
3434
*/
35-
PyObject* next() override;
35+
std::shared_ptr<ReturnVal> next() override;
3636

3737
private:
3838
/* arrow table of all record batches in current chunk */

test/test_cursor.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import os
99
import time
1010
from datetime import datetime
11+
from sys import platform
1112

1213
import pytest
1314
import pytz
@@ -835,11 +836,17 @@ def test_close_twice(conn_testaccount):
835836
conn_testaccount.close()
836837

837838

838-
def test_fetch_out_of_range_timestamp_value(conn):
839-
with conn() as cnx:
840-
cur = cnx.cursor()
841-
cur.execute("""
842-
select '12345-01-02'::timestamp_ntz
839+
@pytest.mark.skipif(not platform.startswith('linux'), reason="""
840+
Running tests only in linux env. Once 3.52.2 is deployed to preprod2,
841+
we can remove this skip and running this tests on all platform
843842
""")
844-
with pytest.raises(errors.InterfaceError):
845-
cur.fetchone()
843+
def test_fetch_out_of_range_timestamp_value(conn):
844+
for result_format in ['arrow', 'json']:
845+
with conn() as cnx:
846+
cur = cnx.cursor()
847+
cur.execute("alter session set python_connector_query_result_format='{}'".format(result_format))
848+
cur.execute("""
849+
select '12345-01-02'::timestamp_ntz
850+
""")
851+
with pytest.raises(errors.InterfaceError):
852+
cur.fetchone()

test/test_unit_arrow_chunk_iterator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ def iterate_over_test_chunk(pyarrow_type, column_meta, source_data_generator, ex
538538
# seek stream to begnning so that we can read from stream
539539
stream.seek(0)
540540
context = ArrowConverterContext()
541-
it = PyArrowIterator(stream, context, False)
541+
it = PyArrowIterator(None, stream, context, False)
542542
it.init(ROW_UNIT)
543543

544544
count = 0

0 commit comments

Comments
 (0)