Skip to content

Commit 28dc2ed

Browse files
sfc-gh-stakedaankit-bhatnagar167
authored andcommitted
SNOW-87349: Use cython to generate glue code from python to c++ (not enabled)
1 parent f99aa35 commit 28dc2ed

13 files changed

+176
-320
lines changed

arrow_iterator.pyx

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# distutils: language = c++
2+
3+
from cpython.ref cimport PyObject
4+
5+
cdef extern from "cpp/ArrowIterator/CArrowChunkIterator.hpp" namespace "sf":
6+
cdef cppclass CArrowChunkIterator:
7+
CArrowChunkIterator()
8+
9+
void addRecordBatch(PyObject * rb)
10+
11+
PyObject *nextRow();
12+
13+
14+
cdef class PyArrowChunkIterator:
15+
cdef CArrowChunkIterator thisptr
16+
17+
def __cinit__(self):
18+
self.thisptr = CArrowChunkIterator()
19+
20+
def add_record_batch(self, rb):
21+
self.thisptr.addRecordBatch(<PyObject *>rb)
22+
23+
def __next__(self):
24+
ret = <object>self.thisptr.nextRow()
25+
if ret is None:
26+
raise StopIteration
27+
else:
28+
return ret

arrow_result.pyx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ from .telemetry import TelemetryField
1010
from .time_util import get_time_millis
1111
try:
1212
from pyarrow.ipc import open_stream
13-
from .libarrow_iterator import ArrowChunkIterator
13+
from .arrow_iterator import PyArrowChunkIterator
1414
except ImportError:
1515
pass
1616

@@ -45,7 +45,7 @@ cdef class ArrowResult:
4545
# result as arrow chunk
4646
arrow_bytes = b64decode(data.get(u'rowsetBase64'))
4747
arrow_reader = open_stream(arrow_bytes)
48-
self._current_chunk_row = ArrowChunkIterator()
48+
self._current_chunk_row = PyArrowChunkIterator()
4949
for rb in arrow_reader:
5050
self._current_chunk_row.add_record_batch(rb)
5151

chunk_downloader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
try:
2222
from pyarrow.ipc import open_stream
23-
from .arrow_iterator import ArrowChunkIterator
23+
from .arrow_iterator import PyArrowChunkIterator
2424
except ImportError:
2525
pass
2626

@@ -345,7 +345,7 @@ def __init__(self, meta):
345345
def to_iterator(self, raw_data_fd, download_time):
346346
gzip_decoder = GzipFile(fileobj=raw_data_fd, mode='r')
347347
reader = open_stream(gzip_decoder)
348-
it = ArrowChunkIterator()
348+
it = PyArrowChunkIterator()
349349
for rb in reader:
350350
it.add_record_batch(rb)
351351
return it

cpp/ArrowIterator/CArrowChunkIterator.cpp

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ sf::CArrowChunkIterator::CArrowChunkIterator()
1010
this->reset();
1111
}
1212

13-
void sf::CArrowChunkIterator::addRecordBatch(std::shared_ptr<arrow::RecordBatch> rb)
13+
void sf::CArrowChunkIterator::addRecordBatch(PyObject * rb)
1414
{
15-
m_cRecordBatches.push_back(rb);
15+
std::shared_ptr<arrow::RecordBatch> cRecordBatch;
16+
arrow::Status status = arrow::py::unwrap_record_batch(rb, &cRecordBatch);
17+
18+
m_cRecordBatches.push_back(cRecordBatch);
1619
m_columnCount = m_cRecordBatches[0]->num_columns();
1720
m_batchCount = m_cRecordBatches.size();
1821
}
@@ -44,7 +47,7 @@ PyObject * sf::CArrowChunkIterator::nextRow()
4447
}
4548
}
4649

47-
return NULL;
50+
return Py_None;
4851
}
4952

5053
PyObject * sf::CArrowChunkIterator::currentRowAsTuple()
@@ -68,6 +71,22 @@ void sf::CArrowChunkIterator::initColumnConverters()
6871
std::shared_ptr<arrow::DataType> dt = schema->field(i)->type();
6972
switch(dt->id())
7073
{
74+
75+
case arrow::Type::type::INT8:
76+
m_currentBatchConverters.push_back(
77+
std::make_shared<sf::Int8Converter>(columnArray.get()));
78+
break;
79+
80+
case arrow::Type::type::INT16:
81+
m_currentBatchConverters.push_back(
82+
std::make_shared<sf::Int16Converter>(columnArray.get()));
83+
break;
84+
85+
case arrow::Type::type::INT32:
86+
m_currentBatchConverters.push_back(
87+
std::make_shared<sf::Int32Converter>(columnArray.get()));
88+
break;
89+
7190
case arrow::Type::type::INT64:
7291
m_currentBatchConverters.push_back(
7392
std::make_shared<sf::Int64Converter>(columnArray.get()));

cpp/ArrowIterator/CArrowChunkIterator.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class CArrowChunkIterator
3434
* Add Arrow RecordBach to current chunk
3535
* @param rb recordbatch to be added
3636
*/
37-
void addRecordBatch(std::shared_ptr<arrow::RecordBatch> rb);
37+
void addRecordBatch(PyObject *rb);
3838

3939
/**
4040
* @return a python tuple object which contains all data in current row

cpp/ArrowIterator/IntConverter.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,35 @@ PyObject* sf::Int64Converter::toPyObject(int64_t rowIndex)
1313
return (m_array->IsValid(rowIndex)) ? PyLong_FromLong(m_array->Value(rowIndex)) : Py_None;
1414
}
1515

16+
sf::Int32Converter::Int32Converter(arrow::Array *array)
17+
{
18+
m_array = dynamic_cast<arrow::Int32Array *>(array);
19+
}
20+
21+
PyObject* sf::Int32Converter::toPyObject(int64_t rowIndex)
22+
{
23+
return (m_array->IsValid(rowIndex)) ? PyLong_FromLong(m_array->Value(rowIndex)) : Py_None;
24+
}
25+
26+
27+
sf::Int16Converter::Int16Converter(arrow::Array *array)
28+
{
29+
m_array = dynamic_cast<arrow::Int16Array *>(array);
30+
}
31+
32+
PyObject* sf::Int16Converter::toPyObject(int64_t rowIndex)
33+
{
34+
return (m_array->IsValid(rowIndex)) ? PyLong_FromLong(m_array->Value(rowIndex)) : Py_None;
35+
}
36+
37+
sf::Int8Converter::Int8Converter(arrow::Array *array)
38+
{
39+
m_array = dynamic_cast<arrow::Int8Array *>(array);
40+
}
41+
42+
PyObject* sf::Int8Converter::toPyObject(int64_t rowIndex)
43+
{
44+
return (m_array->IsValid(rowIndex)) ? PyLong_FromLong(m_array->Value(rowIndex)) : Py_None;
45+
}
46+
47+

cpp/ArrowIterator/IntConverter.hpp

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,45 @@ class Int64Converter : public IColumnConverter
1515
public:
1616
Int64Converter(arrow::Array * array);
1717

18-
PyObject * toPyObject(long rowIndex) override;
18+
PyObject * toPyObject(int64_t rowIndex) override;
1919

2020
private:
2121
arrow::Int64Array * m_array;
2222
};
2323

24+
class Int32Converter : public IColumnConverter
25+
{
26+
public:
27+
Int32Converter(arrow::Array * array);
28+
29+
PyObject * toPyObject(int64_t rowIndex) override;
30+
31+
private:
32+
arrow::Int32Array * m_array;
33+
};
34+
35+
class Int16Converter : public IColumnConverter
36+
{
37+
public:
38+
Int16Converter(arrow::Array * array);
39+
40+
PyObject * toPyObject(int64_t rowIndex) override;
41+
42+
private:
43+
arrow::Int16Array * m_array;
44+
};
45+
46+
class Int8Converter : public IColumnConverter
47+
{
48+
public:
49+
Int8Converter(arrow::Array * array);
50+
51+
PyObject * toPyObject(int64_t rowIndex) override;
52+
53+
private:
54+
arrow::Int8Array * m_array;
55+
};
56+
2457
}
2558

2659
#endif

cpp/ArrowIterator/PyArrowChunkIteratorDef.cpp

Lines changed: 0 additions & 138 deletions
This file was deleted.

cpp/ArrowIterator/StringConverter.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class StringConverter : public IColumnConverter
1414
public:
1515
StringConverter(arrow::Array * array);
1616

17-
PyObject * toPyObject(long rowIndex) override;
17+
PyObject * toPyObject(int64_t rowIndex) override;
1818

1919
private:
2020
arrow::StringArray * m_array;

0 commit comments

Comments
 (0)