Skip to content

Commit d4006c0

Browse files
sfc-gh-stakedaankit-bhatnagar167
authored andcommitted
SNOW-86947: update date, time and timestamp converter for arrow
1 parent 6ffa070 commit d4006c0

13 files changed

+452
-18
lines changed

arrow_context.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2012-2019 Snowflake Computing Inc. All right reserved.
5+
#
6+
7+
from datetime import datetime
8+
from logging import getLogger
9+
from .constants import (
10+
PARAMETER_TIMEZONE
11+
)
12+
13+
import pytz
14+
15+
try:
16+
import tzlocal
17+
except ImportError:
18+
tzlocal = None
19+
20+
logger = getLogger(__name__)
21+
22+
23+
class ArrowConverterContext(object):
24+
def __init__(self, session_parameters={}):
25+
self._timezone = None if PARAMETER_TIMEZONE not in session_parameters else session_parameters[PARAMETER_TIMEZONE]
26+
27+
@property
28+
def timezone(self):
29+
return self._timezone
30+
31+
@timezone.setter
32+
def timezone(self, tz):
33+
self._timezone = tz
34+
35+
def get_session_tz(self):
36+
""" Get the session timezone or use the local computer's timezone. """
37+
try:
38+
tz = 'UTC' if not self.timezone else self.timezone
39+
return pytz.timezone(tz)
40+
except pytz.exceptions.UnknownTimeZoneError:
41+
logger.warning('converting to tzinfo failed')
42+
if tzlocal is not None:
43+
return tzlocal.get_localzone()
44+
else:
45+
try:
46+
return datetime.timezone.utc
47+
except AttributeError:
48+
return pytz.timezone('UTC')

arrow_iterator.pyx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ from cpython.ref cimport PyObject
88

99
cdef extern from "cpp/ArrowIterator/CArrowChunkIterator.hpp" namespace "sf":
1010
cdef cppclass CArrowChunkIterator:
11-
CArrowChunkIterator()
11+
CArrowChunkIterator(PyObject* context)
1212

1313
void addRecordBatch(PyObject * rb)
1414

@@ -20,16 +20,16 @@ cdef extern from "cpp/ArrowIterator/CArrowChunkIterator.hpp" namespace "sf":
2020
cdef class PyArrowChunkIterator:
2121
cdef CArrowChunkIterator * cIterator
2222

23-
def __cinit__(self, arrow_stream_reader):
24-
self.cIterator = new CArrowChunkIterator()
23+
def __cinit__(PyArrowChunkIterator self, object arrow_stream_reader, object arrow_context):
24+
self.cIterator = new CArrowChunkIterator(<PyObject*>arrow_context)
2525
for rb in arrow_stream_reader:
2626
self.cIterator.addRecordBatch(<PyObject *>rb)
2727
self.cIterator.reset()
2828

29-
def __dealloc(self):
29+
def __dealloc__(PyArrowChunkIterator self):
3030
del self.cIterator
3131

32-
def __next__(self):
32+
def __next__(PyArrowChunkIterator self):
3333
ret = <object>self.cIterator.nextRow()
3434
if ret is None:
3535
raise StopIteration

arrow_result.pyx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ from .time_util import get_time_millis
1111
try:
1212
from pyarrow.ipc import open_stream
1313
from .arrow_iterator import PyArrowChunkIterator
14+
from .arrow_context import ArrowConverterContext
1415
except ImportError:
1516
pass
1617

@@ -30,6 +31,7 @@ cdef class ArrowResult:
3031
object _column_idx_to_name
3132
object _current_chunk_row
3233
object _chunk_downloader
34+
object _arrow_context
3335

3436
def __init__(self, raw_response, cursor):
3537
self._reset()
@@ -45,7 +47,8 @@ cdef class ArrowResult:
4547
# result as arrow chunk
4648
arrow_bytes = b64decode(data.get(u'rowsetBase64'))
4749
arrow_reader = open_stream(arrow_bytes)
48-
self._current_chunk_row = PyArrowChunkIterator(arrow_reader)
50+
self._arrow_context = ArrowConverterContext(self._connection._session_parameters)
51+
self._current_chunk_row = PyArrowChunkIterator(arrow_reader, self._arrow_context)
4952

5053
if u'chunks' in data:
5154
chunks = data[u'chunks']
@@ -137,4 +140,5 @@ cdef class ArrowResult:
137140

138141
self._chunk_count = 0
139142
self._chunk_downloader = None
143+
self._arrow_context = None
140144

chunk_downloader.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
try:
2222
from pyarrow.ipc import open_stream
2323
from .arrow_iterator import PyArrowChunkIterator
24+
from .arrow_context import ArrowConverterContext
2425
except ImportError:
2526
pass
2627

@@ -254,7 +255,7 @@ def _fetch_chunk(self, url, headers):
254255
handler = JsonBinaryHandler(is_raw_binary_iterator=True,
255256
use_ijson=self._use_ijson) \
256257
if self._query_result_format == 'json' else \
257-
ArrowBinaryHandler(self._cursor.description)
258+
ArrowBinaryHandler(self._cursor.description, self._connection)
258259

259260
return self._connection.rest.fetch(
260261
u'get', url, headers,
@@ -321,14 +322,15 @@ def to_iterator(self, raw_data_fd, download_time):
321322

322323
class ArrowBinaryHandler(RawBinaryDataHandler):
323324

324-
def __init__(self, meta):
325+
def __init__(self, meta, connection):
325326
self._meta = meta
327+
self._arrow_context = ArrowConverterContext(connection._session_parameters)
326328

327329
"""
328330
Handler to consume data as arrow stream
329331
"""
330332
def to_iterator(self, raw_data_fd, download_time):
331333
gzip_decoder = GzipFile(fileobj=raw_data_fd, mode='r')
332334
reader = open_stream(gzip_decoder)
333-
it = PyArrowChunkIterator(reader)
335+
it = PyArrowChunkIterator(reader, self._arrow_context)
334336
return it

cpp/ArrowIterator/CArrowChunkIterator.cpp

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@
1010
#include "BinaryConverter.hpp"
1111
#include "BooleanConverter.hpp"
1212
#include "DateConverter.hpp"
13+
#include "TimeConverter.hpp"
1314
#include <iostream>
1415
#include "logging.hpp"
1516

1617
namespace sf
1718
{
1819
Logger CArrowChunkIterator::logger("snowflake.connector.CArrowChunkIterator");
1920

20-
CArrowChunkIterator::CArrowChunkIterator() : m_latestReturnedRow(nullptr)
21+
CArrowChunkIterator::CArrowChunkIterator(PyObject* context)
22+
: m_latestReturnedRow(nullptr), m_context(context)
2123
{
2224
}
2325

@@ -232,6 +234,41 @@ void CArrowChunkIterator::initColumnConverters()
232234
break;
233235
}
234236

237+
case SnowflakeType::Type::TIME:
238+
{
239+
int scale = metaData
240+
? std::stoi(metaData->value(metaData->FindKey("scale")))
241+
: 9;
242+
switch (dt->id())
243+
{
244+
case arrow::Type::type::INT32:
245+
{
246+
m_currentBatchConverters.push_back(
247+
std::make_shared<sf::TimeConverter<arrow::Int32Array>>(
248+
columnArray, scale));
249+
break;
250+
}
251+
252+
case arrow::Type::type::INT64:
253+
{
254+
m_currentBatchConverters.push_back(
255+
std::make_shared<sf::TimeConverter<arrow::Int64Array>>(
256+
columnArray, scale));
257+
break;
258+
}
259+
260+
default:
261+
{
262+
/** cout is playing a placeholder here and will be replaced by
263+
* exception soon */
264+
std::cout << "unknown arrow internal data type (" << dt->id()
265+
<< ") for TIME data" << std::endl;
266+
break;
267+
}
268+
}
269+
break;
270+
}
271+
235272
default:
236273
{
237274
/** cout is playing a placeholder here and will be replaced by exception

cpp/ArrowIterator/CArrowChunkIterator.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class CArrowChunkIterator
2626
/**
2727
* Constructor
2828
*/
29-
CArrowChunkIterator();
29+
CArrowChunkIterator(PyObject* context);
3030

3131
/**
3232
* Desctructor
@@ -74,6 +74,9 @@ class CArrowChunkIterator
7474
/** list of column converters*/
7575
std::vector<std::shared_ptr<sf::IColumnConverter>> m_currentBatchConverters;
7676

77+
/** arrow format convert context for the current session */
78+
PyObject* m_context;
79+
7780
/**
7881
* @return python object of tuple which is tuple of all row values
7982
*/

cpp/ArrowIterator/IntConverter.hpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#define PC_INTCONVERTER_HPP
66

77
#include "IColumnConverter.hpp"
8+
#include <iostream>
89

910
namespace sf
1011
{
@@ -18,6 +19,16 @@ class IntConverter : public IColumnConverter
1819
{
1920
}
2021

22+
PyObject* pyLongForward(int64_t value)
23+
{
24+
return PyLong_FromLongLong(value);
25+
}
26+
27+
PyObject* pyLongForward(int32_t value)
28+
{
29+
return PyLong_FromLong(value);
30+
}
31+
2132
PyObject* toPyObject(int64_t rowIndex) override;
2233

2334
private:
@@ -29,7 +40,8 @@ PyObject* IntConverter<T>::toPyObject(int64_t rowIndex)
2940
{
3041
if (m_array->IsValid(rowIndex))
3142
{
32-
return PyLong_FromLongLong(m_array->Value(rowIndex));
43+
// TODO : this forward function need to be tested in Win64
44+
return pyLongForward(m_array->Value(rowIndex));
3345
}
3446
else
3547
{
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright (c) 2013-2019 Snowflake Computing
3+
*/
4+
#include "TimeConverter.hpp"
5+
#include "Python/Helpers.hpp"
6+
#include "Util/time.hpp"
7+
8+
namespace sf
9+
{
10+
/** this file is here for future use and if this is useless at the end, it will
11+
* be removed */
12+
} // namespace sf
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright (c) 2013-2019 Snowflake Computing
3+
*/
4+
#ifndef PC_TIMECONVERTER_HPP
5+
#define PC_TIMECONVERTER_HPP
6+
7+
#include "IColumnConverter.hpp"
8+
#include "Python/Common.hpp"
9+
#include "Python/Helpers.hpp"
10+
#include "Util/time.hpp"
11+
12+
namespace sf
13+
{
14+
15+
template <typename T>
16+
class TimeConverter : public IColumnConverter
17+
{
18+
public:
19+
TimeConverter(std::shared_ptr<arrow::Array> array, int32_t scale)
20+
: m_array(std::dynamic_pointer_cast<T>(array)), m_scale(scale)
21+
{
22+
}
23+
24+
PyObject* toPyObject(int64_t rowIndex) override;
25+
26+
private:
27+
/** can be arrow::Int32Array and arrow::Int64Array */
28+
std::shared_ptr<T> m_array;
29+
30+
int32_t m_scale;
31+
32+
static py::UniqueRef& m_pyDatetimeTime();
33+
};
34+
35+
template <typename T>
36+
PyObject* TimeConverter<T>::toPyObject(int64_t rowIndex)
37+
{
38+
if (m_array->IsValid(rowIndex))
39+
{
40+
int64_t seconds = m_array->Value(rowIndex);
41+
using namespace internal;
42+
py::PyUniqueLock lock;
43+
return PyObject_CallFunction(m_pyDatetimeTime().get(), "iiii",
44+
getHourFromSeconds(seconds, m_scale),
45+
getMinuteFromSeconds(seconds, m_scale),
46+
getSecondFromSeconds(seconds, m_scale),
47+
getMicrosecondFromSeconds(seconds, m_scale));
48+
}
49+
else
50+
{
51+
Py_RETURN_NONE;
52+
}
53+
}
54+
55+
template <typename T>
56+
py::UniqueRef& TimeConverter<T>::m_pyDatetimeTime()
57+
{
58+
static py::UniqueRef pyDatetimeTime;
59+
if (pyDatetimeTime.empty())
60+
{
61+
py::PyUniqueLock lock;
62+
py::UniqueRef pyDatetimeModule;
63+
py::importPythonModule("datetime", pyDatetimeModule);
64+
/** TODO : to check status here */
65+
66+
py::importFromModule(pyDatetimeModule, "time", pyDatetimeTime);
67+
}
68+
return pyDatetimeTime;
69+
}
70+
71+
} // namespace sf
72+
73+
#endif // PC_TIMECONVERTER_HPP

0 commit comments

Comments
 (0)