Skip to content

Commit da81628

Browse files
sfc-gh-stakedaankit-bhatnagar167
authored andcommitted
SNOW-86947: added timestamp converter for arrow
1 parent 3f9c3d0 commit da81628

29 files changed

+830
-185
lines changed

arrow_context.py

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
# Copyright (c) 2012-2019 Snowflake Computing Inc. All right reserved.
55
#
66

7-
from datetime import datetime
7+
import time
8+
from datetime import datetime, timedelta
89
from logging import getLogger
910
from .constants import (
10-
PARAMETER_TIMEZONE
11-
)
11+
PARAMETER_TIMEZONE)
12+
from .converter import (
13+
_generate_tzinfo_from_tzoffset)
1214

1315
import pytz
1416

@@ -17,6 +19,8 @@
1719
except ImportError:
1820
tzlocal = None
1921

22+
ZERO_EPOCH = datetime.utcfromtimestamp(0)
23+
2024
logger = getLogger(__name__)
2125

2226

@@ -32,7 +36,7 @@ def timezone(self):
3236
def timezone(self, tz):
3337
self._timezone = tz
3438

35-
def get_session_tz(self):
39+
def _get_session_tz(self):
3640
""" Get the session timezone or use the local computer's timezone. """
3741
try:
3842
tz = 'UTC' if not self.timezone else self.timezone
@@ -46,3 +50,46 @@ def get_session_tz(self):
4650
return datetime.timezone.utc
4751
except AttributeError:
4852
return pytz.timezone('UTC')
53+
54+
def TIMESTAMP_TZ_to_python(self, microseconds, tz):
55+
"""
56+
TIMESTAMP TZ to datetime
57+
58+
The timezone offset is piggybacked
59+
60+
@para microseconds : float
61+
@para tz : int
62+
"""
63+
64+
tzinfo = _generate_tzinfo_from_tzoffset(tz - 1440)
65+
return datetime.fromtimestamp(microseconds, tz=tzinfo)
66+
67+
def TIMESTAMP_TZ_to_python_windows(self, microseconds, tz):
68+
tzinfo = _generate_tzinfo_from_tzoffset(tz - 1440)
69+
t = ZERO_EPOCH + timedelta(seconds=microseconds)
70+
if pytz.utc != tzinfo:
71+
t += tzinfo.utcoffset(t, is_dst=False)
72+
return t.replace(tzinfo=tzinfo)
73+
74+
def TIMESTAMP_NTZ_to_python(self, microseconds):
75+
return datetime.utcfromtimestamp(microseconds)
76+
77+
def TIMESTAMP_NTZ_to_python_windows(self, microseconds):
78+
return ZERO_EPOCH + timedelta(seconds=(microseconds))
79+
80+
def TIMESTAMP_LTZ_to_python(self, microseconds):
81+
tzinfo = self._get_session_tz()
82+
return datetime.fromtimestamp(microseconds, tz=tzinfo)
83+
84+
def TIMESTAMP_LTZ_to_python_windows(self, microseconds):
85+
tzinfo = self._get_session_tz()
86+
try:
87+
t0 = ZERO_EPOCH + timedelta(seconds=(microseconds))
88+
t = pytz.utc.localize(t0, is_dst=False).astimezone(tzinfo)
89+
return t
90+
except OverflowError:
91+
logger.debug(
92+
"OverflowError in converting from epoch time to "
93+
"timestamp_ltz: %s(ms). Falling back to use struct_time."
94+
)
95+
return time.localtime(microseconds)

cpp/ArrowIterator/BinaryConverter.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55

66
namespace sf
77
{
8+
Logger BinaryConverter::logger("snowflake.connector.BinaryConverter");
9+
810
BinaryConverter::BinaryConverter(std::shared_ptr<arrow::Array> array)
911
: m_array(std::dynamic_pointer_cast<arrow::BinaryArray>(array))
1012
{
1113
}
1214

13-
PyObject* BinaryConverter::toPyObject(int64_t rowIndex)
15+
PyObject* BinaryConverter::toPyObject(int64_t rowIndex) const
1416
{
1517
if (m_array->IsValid(rowIndex))
1618
{

cpp/ArrowIterator/BinaryConverter.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,22 @@
55
#define PC_BINARYCONVERTER_HPP
66

77
#include "IColumnConverter.hpp"
8+
#include "logging.hpp"
89

910
namespace sf
1011
{
1112

1213
class BinaryConverter : public IColumnConverter
1314
{
1415
public:
15-
BinaryConverter(std::shared_ptr<arrow::Array> array);
16+
explicit BinaryConverter(std::shared_ptr<arrow::Array> array);
1617

17-
PyObject* toPyObject(int64_t rowIndex) override;
18+
PyObject* toPyObject(int64_t rowIndex) const override;
1819

1920
private:
2021
std::shared_ptr<arrow::BinaryArray> m_array;
22+
23+
static Logger logger;
2124
};
2225

2326
} // namespace sf

cpp/ArrowIterator/BooleanConverter.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
*/
44
#include "BooleanConverter.hpp"
55

6-
#include <iostream>
7-
86
namespace sf
97
{
108

@@ -13,7 +11,7 @@ BooleanConverter::BooleanConverter(std::shared_ptr<arrow::Array> array)
1311
{
1412
}
1513

16-
PyObject* BooleanConverter::toPyObject(int64_t rowIndex)
14+
PyObject* BooleanConverter::toPyObject(int64_t rowIndex) const
1715
{
1816
if (m_array->IsValid(rowIndex))
1917
{

cpp/ArrowIterator/BooleanConverter.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class BooleanConverter : public IColumnConverter
1414
public:
1515
explicit BooleanConverter(std::shared_ptr<arrow::Array> array);
1616

17-
PyObject* toPyObject(int64_t rowIndex) override;
17+
PyObject* toPyObject(int64_t rowIndex) const override;
1818

1919
private:
2020
std::shared_ptr<arrow::BooleanArray> m_array;

cpp/ArrowIterator/CArrowChunkIterator.cpp

Lines changed: 130 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
#include "BinaryConverter.hpp"
1111
#include "BooleanConverter.hpp"
1212
#include "DateConverter.hpp"
13+
#include "TimeStampConverter.hpp"
1314
#include "TimeConverter.hpp"
14-
#include <iostream>
1515
#include "logging.hpp"
1616

1717
namespace sf
@@ -37,8 +37,7 @@ void CArrowChunkIterator::reset()
3737
m_currentBatchIndex = -1;
3838
m_rowIndexInBatch = -1;
3939
m_rowCountInBatch = 0;
40-
Py_XDECREF(m_latestReturnedRow);
41-
m_latestReturnedRow = nullptr;
40+
m_latestReturnedRow.reset();
4241

4342
logger.info("Arrow chunk info: batchCount %d, columnCount %d", m_batchCount,
4443
m_columnCount);
@@ -47,12 +46,11 @@ void CArrowChunkIterator::reset()
4746
PyObject* CArrowChunkIterator::nextRow()
4847
{
4948
m_rowIndexInBatch++;
50-
Py_XDECREF(m_latestReturnedRow);
51-
m_latestReturnedRow = nullptr;
5249

5350
if (m_rowIndexInBatch < m_rowCountInBatch)
5451
{
55-
return this->currentRowAsTuple();
52+
this->currentRowAsTuple();
53+
return m_latestReturnedRow.get();
5654
}
5755
else
5856
{
@@ -66,7 +64,8 @@ PyObject* CArrowChunkIterator::nextRow()
6664
logger.info("Current batch index: %d, rows in current batch: %d",
6765
m_currentBatchIndex, m_rowCountInBatch);
6866

69-
return this->currentRowAsTuple();
67+
this->currentRowAsTuple();
68+
return m_latestReturnedRow.get();
7069
}
7170
}
7271

@@ -75,15 +74,16 @@ PyObject* CArrowChunkIterator::nextRow()
7574
return Py_None;
7675
}
7776

78-
PyObject* CArrowChunkIterator::currentRowAsTuple()
77+
void CArrowChunkIterator::currentRowAsTuple()
7978
{
80-
PyObject* tuple = PyTuple_New(m_columnCount);
79+
m_latestReturnedRow.reset(PyTuple_New(m_columnCount));
8180
for (int i = 0; i < m_columnCount; i++)
8281
{
8382
PyTuple_SET_ITEM(
84-
tuple, i, m_currentBatchConverters[i]->toPyObject(m_rowIndexInBatch));
83+
m_latestReturnedRow.get(), i,
84+
m_currentBatchConverters[i]->toPyObject(m_rowIndexInBatch));
8585
}
86-
return m_latestReturnedRow = tuple;
86+
return;
8787
}
8888

8989
void CArrowChunkIterator::initColumnConverters()
@@ -189,10 +189,9 @@ void CArrowChunkIterator::initColumnConverters()
189189

190190
default:
191191
{
192-
/** cout is playing a placeholder here and will be replaced by
193-
* exception soon */
194-
std::cout << "unknown arrow internal data type (" << dt->id()
195-
<< ") for FIXED data" << std::endl;
192+
/** TODO: how to throw an exception will be decided later */
193+
logger.error("unknown arrow internal data type(%d) for FIXED data",
194+
dt->id());
196195
break;
197196
}
198197
}
@@ -259,23 +258,130 @@ void CArrowChunkIterator::initColumnConverters()
259258

260259
default:
261260
{
262-
/** cout is playing a placeholder here and will be replaced by
263-
* exception soon */
264-
std::cout << "unknown arrow internal data type (" << dt->id()
265-
<< ") for TIME data" << std::endl;
261+
/** TODO: how to throw an exception will be decided later */
262+
logger.error("unknown arrow internal data type(%d) for TIME data",
263+
dt->id());
266264
break;
267265
}
268266
}
269267
break;
270268
}
271269

270+
case SnowflakeType::Type::TIMESTAMP_NTZ:
271+
{
272+
int scale = metaData
273+
? std::stoi(metaData->value(metaData->FindKey("scale")))
274+
: 9;
275+
switch (dt->id())
276+
{
277+
case arrow::Type::type::INT64:
278+
{
279+
m_currentBatchConverters.push_back(
280+
std::make_shared<sf::OneFieldTimeStampNTZConverter>(
281+
columnArray, scale, m_context));
282+
break;
283+
}
284+
285+
case arrow::Type::type::STRUCT:
286+
{
287+
m_currentBatchConverters.push_back(
288+
std::make_shared<sf::TwoFieldTimeStampNTZConverter>(
289+
columnArray, scale, m_context));
290+
break;
291+
}
292+
293+
default:
294+
{
295+
/** TODO: how to throw an exception will be decided later */
296+
logger.error(
297+
"unknown arrow internal data type(%d) for TIMESTAMP_NTZ data",
298+
dt->id());
299+
break;
300+
}
301+
}
302+
break;
303+
}
304+
305+
case SnowflakeType::Type::TIMESTAMP_LTZ:
306+
{
307+
int scale = metaData
308+
? std::stoi(metaData->value(metaData->FindKey("scale")))
309+
: 9;
310+
switch (dt->id())
311+
{
312+
case arrow::Type::type::INT64:
313+
{
314+
m_currentBatchConverters.push_back(
315+
std::make_shared<sf::OneFieldTimeStampLTZConverter>(
316+
columnArray, scale, m_context));
317+
break;
318+
}
319+
320+
case arrow::Type::type::STRUCT:
321+
{
322+
m_currentBatchConverters.push_back(
323+
std::make_shared<sf::TwoFieldTimeStampLTZConverter>(
324+
columnArray, scale, m_context));
325+
break;
326+
}
327+
328+
default:
329+
{
330+
/** TODO: how to throw an exception will be decided later */
331+
logger.error(
332+
"unknown arrow internal data type(%d) for TIMESTAMP_LTZ data",
333+
dt->id());
334+
break;
335+
}
336+
}
337+
break;
338+
}
339+
340+
case SnowflakeType::Type::TIMESTAMP_TZ:
341+
{
342+
int scale = metaData
343+
? std::stoi(metaData->value(metaData->FindKey("scale")))
344+
: 9;
345+
int byteLength =
346+
metaData
347+
? std::stoi(metaData->value(metaData->FindKey("byteLength")))
348+
: 16;
349+
switch (byteLength)
350+
{
351+
case 8:
352+
{
353+
m_currentBatchConverters.push_back(
354+
std::make_shared<sf::TwoFieldTimeStampTZConverter>(
355+
columnArray, scale, m_context));
356+
break;
357+
}
358+
359+
case 16:
360+
{
361+
m_currentBatchConverters.push_back(
362+
std::make_shared<sf::ThreeFieldTimeStampTZConverter>(
363+
columnArray, scale, m_context));
364+
break;
365+
}
366+
367+
default:
368+
{
369+
/** TODO: how to throw an exception will be decided later */
370+
logger.error(
371+
"unknown arrow internal data type(%d) for TIMESTAMP_TZ data",
372+
dt->id());
373+
break;
374+
}
375+
}
376+
377+
break;
378+
}
379+
272380
default:
273381
{
274-
/** cout is playing a placeholder here and will be replaced by exception
275-
* soon */
276-
std::cout << "[ERROR] unknown snowflake data type : "
277-
<< metaData->value(metaData->FindKey("logicalType"))
278-
<< std::endl;
382+
/** TODO: how to throw an exception will be decided later */
383+
logger.error("unknown snowflake data type : %d",
384+
metaData->value(metaData->FindKey("logicalType")));
279385
break;
280386
}
281387
}

0 commit comments

Comments
 (0)