Skip to content

Commit fd76978

Browse files
committed
SNOW-870230: robust error stream handling (#1710)
1 parent 83eff9d commit fd76978

23 files changed

+935
-391
lines changed

src/snowflake/connector/cpp/ArrowIterator/BinaryConverter.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ PyObject* BinaryConverter::toPyObject(int64_t rowIndex) const
2020
Py_RETURN_NONE;
2121
}
2222
ArrowStringView stringView = ArrowArrayViewGetStringUnsafe(m_array, rowIndex);
23-
return PyByteArray_FromStringAndSize(stringView.data, stringView.size_bytes);
23+
return PyByteArray_FromStringAndSize(stringView.data, stringView.size_bytes);
2424
}
2525

2626
} // namespace sf

src/snowflake/connector/cpp/ArrowIterator/CArrowChunkIterator.cpp

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,15 @@
1717
#include <string>
1818
#include <vector>
1919

20-
21-
#define SF_CHECK_PYTHON_ERR() \
22-
if (py::checkPyError())\
23-
{\
24-
PyObject *type, * val, *traceback;\
25-
PyErr_Fetch(&type, &val, &traceback);\
26-
PyErr_Clear();\
27-
m_currentPyException.reset(val);\
28-
\
29-
Py_XDECREF(type);\
30-
Py_XDECREF(traceback);\
31-
\
32-
return std::make_shared<ReturnVal>(nullptr, m_currentPyException.get());\
33-
}
34-
35-
3620
namespace sf
3721
{
3822

3923
CArrowChunkIterator::CArrowChunkIterator(PyObject* context, char* arrow_bytes, int64_t arrow_bytes_size, PyObject *use_numpy)
4024
: CArrowIterator(arrow_bytes, arrow_bytes_size), m_latestReturnedRow(nullptr), m_context(context)
4125
{
26+
if (py::checkPyError()) {
27+
return;
28+
}
4229
m_currentBatchIndex = -1;
4330
m_rowIndexInBatch = -1;
4431
m_rowCountInBatch = 0;

src/snowflake/connector/cpp/ArrowIterator/CArrowChunkIterator.hpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,6 @@ class CArrowChunkIterator : public CArrowIterator
6666
/** total number of rows inside current record batch */
6767
int64_t m_rowCountInBatch;
6868

69-
/** pointer to the current python exception object */
70-
py::UniqueRef m_currentPyException;
71-
7269
/** arrow format convert context for the current session */
7370
PyObject* m_context;
7471

src/snowflake/connector/cpp/ArrowIterator/CArrowIterator.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,18 @@ CArrowIterator::CArrowIterator(char* arrow_bytes, int64_t arrow_bytes_size)
4545
SF_CHECK_ARROW_RC_AND_RELEASE_ARROW_STREAM(returnCode, stream, "[Snowflake Exception] error setting ArrowArrayView from array : %s, error code: %d", ArrowErrorMessage(&error), returnCode);
4646
m_ipcArrowArrayViewVec.push_back(std::move(newUniqueArrayView));
4747
} else {
48+
SF_CHECK_ARROW_RC_AND_RELEASE_ARROW_STREAM(retcode, stream, "[Snowflake Exception] error getting schema from stream, error code: %d", returnCode);
4849
break;
4950
}
5051
}
5152
stream.release(&stream);
5253
logger->debug(__FILE__, __func__, __LINE__, "Arrow BatchSize: %d", m_ipcArrowArrayVec.size());
5354
}
5455

56+
std::shared_ptr<ReturnVal> CArrowIterator::checkInitializationStatus()
57+
{
58+
SF_CHECK_PYTHON_ERR()
59+
return std::make_shared<ReturnVal>(nullptr, nullptr);
60+
}
61+
5562
}

src/snowflake/connector/cpp/ArrowIterator/CArrowIterator.hpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,28 @@ static const char* NANOARROW_TYPE_ENUM_STRING[] = {
7777
#define SF_CHECK_ARROW_RC_AND_RELEASE_ARROW_STREAM(arrow_status, stream, format_string, ...) \
7878
if (arrow_status != NANOARROW_OK) \
7979
{ \
80-
std::string errorInfo = Logger::formatString(format_string, ##__VA_ARGS__); \
81-
logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str()); \
82-
PyErr_SetString(PyExc_Exception, errorInfo.c_str()); \
80+
std::string errorInfo = std::string(format_string) + std::string(", error info: ") + std::string(stream.get_last_error(&stream)); \
81+
std::string fullErrorInfo = Logger::formatString(errorInfo.c_str(), ##__VA_ARGS__); \
82+
logger->error(__FILE__, __func__, __LINE__, fullErrorInfo.c_str()); \
83+
PyErr_SetString(PyExc_Exception, fullErrorInfo.c_str()); \
8384
stream.release(&stream); \
8485
return; \
8586
}
8687

88+
#define SF_CHECK_PYTHON_ERR() \
89+
if (py::checkPyError())\
90+
{\
91+
PyObject *type, * val, *traceback;\
92+
PyErr_Fetch(&type, &val, &traceback);\
93+
PyErr_Clear();\
94+
m_currentPyException.reset(val);\
95+
\
96+
Py_XDECREF(type);\
97+
Py_XDECREF(traceback);\
98+
\
99+
return std::make_shared<ReturnVal>(nullptr, m_currentPyException.get());\
100+
}
101+
87102
namespace sf
88103
{
89104

@@ -122,13 +137,19 @@ class CArrowIterator
122137
virtual std::vector<uintptr_t> getArrowArrayPtrs() { return {}; };
123138
virtual std::vector<uintptr_t> getArrowSchemaPtrs() { return {}; };
124139

140+
/** check whether initialization succeeded or encountered error */
141+
std::shared_ptr<ReturnVal> checkInitializationStatus();
142+
125143
protected:
126144
static Logger* logger;
127145

128146
/** nanoarrow data */
129147
std::vector<nanoarrow::UniqueArray> m_ipcArrowArrayVec;
130148
std::vector<nanoarrow::UniqueArrayView> m_ipcArrowArrayViewVec;
131149
nanoarrow::UniqueSchema m_ipcArrowSchema;
150+
151+
/** pointer to the current python exception object */
152+
py::UniqueRef m_currentPyException;
132153
};
133154
}
134155

src/snowflake/connector/cpp/ArrowIterator/CArrowTableIterator.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ const bool number_to_decimal
190190
m_context(context),
191191
m_convert_number_to_decimal(number_to_decimal)
192192
{
193+
if (py::checkPyError()) {
194+
return;
195+
}
193196
py::UniqueRef tz(PyObject_GetAttrString(m_context, "_timezone"));
194197
PyArg_Parse(tz.get(), "s", &m_timezone);
195198
}

src/snowflake/connector/cpp/ArrowIterator/arrow_iterator.pyx

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ cdef extern from "CArrowIterator.hpp" namespace "sf":
3939

4040
cdef cppclass CArrowIterator:
4141
shared_ptr[ReturnVal] next() except +;
42+
shared_ptr[ReturnVal] checkInitializationStatus() except +;
4243
vector[uintptr_t] getArrowArrayPtrs();
4344
vector[uintptr_t] getArrowSchemaPtrs();
4445

@@ -114,11 +115,11 @@ cdef class PyArrowIterator(EmptyPyArrowIterator):
114115
object use_dict_result,
115116
object numpy,
116117
object number_to_decimal,
117-
118+
object is_table_unit = False,
118119
):
119120
self.context = arrow_context
120121
self.cIterator = NULL
121-
self.unit = ''
122+
self.unit = IterUnit.TABLE_UNIT.value if is_table_unit else IterUnit.ROW_UNIT.value
122123
self.use_dict_result = use_dict_result
123124
self.cursor = cursor
124125
self.use_numpy = numpy
@@ -129,6 +130,11 @@ cdef class PyArrowIterator(EmptyPyArrowIterator):
129130
self.arrow_bytes_size = len(arrow_bytes)
130131
self.python_bytes = arrow_bytes
131132

133+
if self.unit == IterUnit.TABLE_UNIT.value:
134+
self.init_table_unit()
135+
else:
136+
self.init_row_unit()
137+
132138
def __dealloc__(self):
133139
del self.cIterator
134140

@@ -174,6 +180,9 @@ cdef class PyArrowIterator(EmptyPyArrowIterator):
174180
self.unit = iter_unit
175181

176182
def init_row_unit(self) -> None:
183+
if self.cIterator is not NULL:
184+
return
185+
177186
self.cIterator = new CArrowChunkIterator(
178187
<PyObject *> self.context,
179188
self.arrow_bytes,
@@ -187,6 +196,16 @@ cdef class PyArrowIterator(EmptyPyArrowIterator):
187196
self.arrow_bytes_size,
188197
<PyObject *> self.use_numpy
189198
)
199+
self.cret = self.cIterator.checkInitializationStatus()
200+
if self.cret.get().exception:
201+
Error.errorhandler_wrapper(
202+
self.cursor.connection if self.cursor is not None else None,
203+
self.cursor,
204+
OperationalError,
205+
{
206+
'msg': f'Failed to open arrow stream: {str(<object>self.cret.get().exception)}',
207+
'errno': ER_FAILED_TO_READ_ARROW_STREAM
208+
})
190209
snow_logger.debug(msg=f"Batches read: {self.cIterator.getArrowArrayPtrs().size()}", path_name=__file__, func_name="init_row_unit")
191210

192211
def init_table_unit(self) -> None:
@@ -202,13 +221,25 @@ cdef class PyArrowIterator(EmptyPyArrowIterator):
202221
},
203222
)
204223

224+
if self.cIterator is not NULL:
225+
return
226+
205227
self.cIterator = new CArrowTableIterator(
206228
<PyObject *> self.context,
207229
self.arrow_bytes,
208230
self.arrow_bytes_size,
209231
self.number_to_decimal,
210232
)
211-
233+
self.cret = self.cIterator.checkInitializationStatus()
234+
if self.cret.get().exception:
235+
Error.errorhandler_wrapper(
236+
self.cursor.connection if self.cursor is not None else None,
237+
self.cursor,
238+
OperationalError,
239+
{
240+
'msg': f'Failed to open arrow stream: {str(<object>self.cret.get().exception)}',
241+
'errno': ER_FAILED_TO_READ_ARROW_STREAM
242+
})
212243
self.cret = self.cIterator.next()
213244
self.unit = 'table'
214245
self.nanoarrow_Table = self.cIterator.getArrowArrayPtrs()

src/snowflake/connector/cpp/ArrowIterator/flatcc/flatcc_assert.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ extern "C" {
2525
* `FLATCC_NO_ASSERT`.
2626
*/
2727

28-
#ifdef FLATCC_NO_ASSERT
28+
#ifdef FLATCC_NO_ASSERT
2929
/* NOTE: This will not affect inclusion of <assert.h> for static assertions. */
3030
#undef FLATCC_ASSERT
3131
#define FLATCC_ASSERT(x) ((void)0)

src/snowflake/connector/cpp/ArrowIterator/flatcc/flatcc_epilogue.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,3 @@
55
#endif
66

77
#include "flatcc/portable/pdiagnostic_pop.h"
8-

src/snowflake/connector/cpp/ArrowIterator/flatcc/portable/paligned_alloc.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ extern "C" {
5353
#if defined (__MINGW32__)
5454
/* MingW does not provide aligned_alloc despite defining _ISOC11_SOURCE */
5555
#define PORTABLE_C11_ALIGNED_ALLOC 0
56-
#elif defined (_ISOC11_SOURCE)
56+
#elif defined (_ISOC11_SOURCE)
5757
/* glibc aligned_alloc detection, but MingW is not truthful */
5858
#define PORTABLE_C11_ALIGNED_ALLOC 1
5959
#elif defined (__GLIBC__)
@@ -178,7 +178,7 @@ static inline void *__portable_aligned_alloc(size_t alignment, size_t size)
178178
static inline void __portable_aligned_free(void *p)
179179
{
180180
char *raw;
181-
181+
182182
if (p) {
183183
raw = (char*)((void **)p)[-1];
184184
free(raw);

0 commit comments

Comments
 (0)