Skip to content

Commit 997f327

Browse files
flag truncate_timestamps added to fetcharrow functions
Arrow only allows timestamps with years between 1400 to 9999. This new flag will allow to truncate the dates preventing exceptions.
1 parent 5556625 commit 997f327

File tree

8 files changed

+69
-16
lines changed

8 files changed

+69
-16
lines changed

cpp/turbodbc/Library/src/time_helpers.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#include <turbodbc/time_helpers.h>
2-
32
#ifdef _WIN32
43
#include <windows.h>
54
#endif
@@ -26,6 +25,21 @@ int64_t timestamp_to_microseconds(char const * data_pointer)
2625
return (ts - timestamp_epoch).total_microseconds();
2726
}
2827

28+
int64_t timestamp_to_microseconds_truncated(char const * data_pointer)
29+
{
30+
auto & sql_ts = *reinterpret_cast<SQL_TIMESTAMP_STRUCT const *>(data_pointer);
31+
if (sql_ts.year > 9999) {
32+
SQL_TIMESTAMP_STRUCT new_sql_ts = {9999, 12, 31, 23, 59, 59, 999999000};
33+
return timestamp_to_microseconds(reinterpret_cast<char const *>(&new_sql_ts));
34+
}
35+
if(sql_ts.year < 1400) {
36+
SQL_TIMESTAMP_STRUCT new_sql_ts = {1400, 1, 1, 0, 0, 0, 0};
37+
return timestamp_to_microseconds(reinterpret_cast<char const *>(&new_sql_ts));
38+
}
39+
40+
return timestamp_to_microseconds(data_pointer);
41+
}
42+
2943

3044
void microseconds_to_timestamp(int64_t microseconds, char * data_pointer)
3145
{

cpp/turbodbc/Library/turbodbc/time_helpers.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,15 @@ namespace turbodbc {
1010
*/
1111
int64_t timestamp_to_microseconds(char const * data_pointer);
1212

13+
14+
/**
15+
* @brief Convert an SQL_TIMESTAMP_STRUCT stored at data_pointer to an
16+
* integer describing the elapsed microseconds since the POSIX epoch
17+
* and truncated to arrow valid range (year 1400 to 9999)
18+
*/
19+
int64_t timestamp_to_microseconds_truncated(char const * data_pointer);
20+
21+
1322
/**
1423
* @brief Convert the number of microseconds since the POSIX epoch
1524
* to a timestamp and store it in an SQL_TIMESTAMP_STRUCT located

cpp/turbodbc/Test/tests/time_helpers_test.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <sql.h>
99

1010
using turbodbc::timestamp_to_microseconds;
11+
using turbodbc::timestamp_to_microseconds_truncated;
1112
using turbodbc::microseconds_to_timestamp;
1213
using turbodbc::nanoseconds_to_timestamp;
1314
using turbodbc::date_to_days;
@@ -59,6 +60,24 @@ TEST(TimeHelpersTest, MicrosecondsToTimestampForYear4000)
5960
}
6061

6162

63+
TEST(TimeHelpersTest, TimestampToMicrosecondsTruncatedForYear10000)
64+
{
65+
SQL_TIMESTAMP_STRUCT data = {10000, 01, 02, 3, 4, 5, 123456000};
66+
// expectation one microsecond before year 10000
67+
std::int64_t expected = 253402300799999999;
68+
EXPECT_EQ(expected, timestamp_to_microseconds_truncated(reinterpret_cast<char const *>(&data)));
69+
}
70+
71+
72+
TEST(TimeHelpersTest, TimestampToMicrosecondsTruncatedForYear100)
73+
{
74+
SQL_TIMESTAMP_STRUCT data = {100, 01, 02, 3, 4, 5, 0};
75+
// expectation january first 1400
76+
std::int64_t expected = -17987443200000000;
77+
EXPECT_EQ(expected, timestamp_to_microseconds_truncated(reinterpret_cast<char const *>(&data)));
78+
}
79+
80+
6281
TEST(TimeHelpersTest, NanosecondsToTimestampForEpoch)
6382
{
6483
std::int64_t const nanoseconds = 0;

cpp/turbodbc_arrow/Library/src/arrow_result_set.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ Status AppendIntsToBuilder(size_t rows_in_batch, std::unique_ptr<ArrayBuilder> c
159159

160160
}
161161

162-
arrow_result_set::arrow_result_set(turbodbc::result_sets::result_set & base, bool strings_as_dictionary, bool adaptive_integers) :
163-
base_result_(base), strings_as_dictionary_(strings_as_dictionary), adaptive_integers_(adaptive_integers)
162+
arrow_result_set::arrow_result_set(turbodbc::result_sets::result_set & base, bool strings_as_dictionary, bool adaptive_integers, bool truncate_timestamps) :
163+
base_result_(base), strings_as_dictionary_(strings_as_dictionary), adaptive_integers_(adaptive_integers), truncate_timestamps_(truncate_timestamps)
164164
{
165165
}
166166

@@ -213,12 +213,14 @@ Status append_to_bool_builder(size_t rows_in_batch, std::unique_ptr<ArrayBuilder
213213
return typed_builder->AppendValues(data_ptr, rows_in_batch, valid_bytes);
214214
}
215215

216-
Status append_to_timestamp_builder(size_t rows_in_batch, std::unique_ptr<ArrayBuilder> const& builder, cpp_odbc::multi_value_buffer const& input_buffer, uint8_t*) {
216+
Status append_to_timestamp_builder(size_t rows_in_batch, std::unique_ptr<ArrayBuilder> const& builder, cpp_odbc::multi_value_buffer const& input_buffer, uint8_t*, bool truncate_timestamp) {
217217
auto typed_builder = static_cast<TimestampBuilder*>(builder.get());
218218
for (std::size_t j = 0; j < rows_in_batch; ++j) {
219219
auto element = input_buffer[j];
220220
if (element.indicator == SQL_NULL_DATA) {
221221
ARROW_RETURN_NOT_OK(typed_builder->AppendNull());
222+
} else if (truncate_timestamp) {
223+
ARROW_RETURN_NOT_OK(typed_builder->Append(turbodbc::timestamp_to_microseconds_truncated(element.data_pointer)));
222224
} else {
223225
ARROW_RETURN_NOT_OK(typed_builder->Append(turbodbc::timestamp_to_microseconds(element.data_pointer)));
224226
}
@@ -288,7 +290,7 @@ Status arrow_result_set::process_batch(size_t rows_in_batch, std::vector<std::un
288290
ARROW_RETURN_NOT_OK(append_to_bool_builder(rows_in_batch, columns[i], buffers[i].get(), valid_bytes.data()));
289291
break;
290292
case turbodbc::type_code::timestamp:
291-
ARROW_RETURN_NOT_OK(append_to_timestamp_builder(rows_in_batch, columns[i], buffers[i].get(), valid_bytes.data()));
293+
ARROW_RETURN_NOT_OK(append_to_timestamp_builder(rows_in_batch, columns[i], buffers[i].get(), valid_bytes.data(), truncate_timestamps_));
292294
break;
293295
case turbodbc::type_code::date:
294296
ARROW_RETURN_NOT_OK(append_to_date_builder(rows_in_batch, columns[i], buffers[i].get(), valid_bytes.data()));

cpp/turbodbc_arrow/Library/src/python_bindings.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ using turbodbc_arrow::arrow_result_set;
99
namespace {
1010

1111
arrow_result_set make_arrow_result_set(std::shared_ptr<turbodbc::result_sets::result_set> result_set_pointer,
12-
bool strings_as_dictionary, bool adaptive_integers)
12+
bool strings_as_dictionary, bool adaptive_integers, bool truncate_timestamps)
1313
{
14-
return arrow_result_set(*result_set_pointer, strings_as_dictionary, adaptive_integers);
14+
return arrow_result_set(*result_set_pointer, strings_as_dictionary, adaptive_integers, truncate_timestamps);
1515
}
1616

1717
void set_arrow_parameters(turbodbc::cursor & cursor, pybind11::object const & pyarrow_table)

cpp/turbodbc_arrow/Library/turbodbc_arrow/arrow_result_set.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class PYBIND11_EXPORT arrow_result_set {
2727
* in the base result set in a row-based fashion
2828
*/
2929
arrow_result_set(turbodbc::result_sets::result_set & base, bool strings_as_dictionary,
30-
bool adaptive_integers);
30+
bool adaptive_integers, bool truncate_timestamps);
3131

3232
/**
3333
* @brief Retrieve a native (C++) Arrow Table which contains
@@ -58,6 +58,7 @@ class PYBIND11_EXPORT arrow_result_set {
5858
turbodbc::result_sets::result_set & base_result_;
5959
bool strings_as_dictionary_;
6060
bool adaptive_integers_;
61+
bool truncate_timestamps_;
6162
};
6263

6364
}

cpp/turbodbc_arrow/Test/tests/arrow_result_set_test.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ namespace {
2929

3030
bool const strings_as_strings = false;
3131
bool const strings_as_dictionaries = true;
32+
bool const truncate_timestamps = true;
3233

3334
bool const plain_integers = false;
3435
bool const compressed_integers = true;
@@ -129,7 +130,7 @@ class ArrowResultSetTest : public ::testing::Test {
129130
auto schema = std::make_shared<arrow::Schema>(expected_fields);
130131
std::shared_ptr<arrow::Table> expected_table = arrow::Table::Make(schema, expected_arrays);
131132

132-
turbodbc_arrow::arrow_result_set ars(rs, strings_as_dictionary, adaptive_integers);
133+
turbodbc_arrow::arrow_result_set ars(rs, strings_as_dictionary, adaptive_integers, truncate_timestamps);
133134
std::shared_ptr<arrow::Table> table;
134135
ASSERT_OK(ars.fetch_all_native(&table, false));
135136
ASSERT_TRUE(expected_table->Equals(*table));
@@ -151,7 +152,7 @@ TEST_F(ArrowResultSetTest, SimpleSchemaConversion)
151152
"int_column", turbodbc::type_code::integer, size_unimportant, true}};
152153
EXPECT_CALL(rs, do_get_column_info()).WillRepeatedly(testing::Return(expected));
153154

154-
turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers);
155+
turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers, truncate_timestamps);
155156
auto schema = ars.schema();
156157
ASSERT_EQ(schema->num_fields(), 1);
157158
auto field = schema->field(0);
@@ -191,7 +192,7 @@ TEST_F(ArrowResultSetTest, AllTypesSchemaConversion)
191192
std::make_shared<arrow::Field>("nonnull_int_column", arrow::int64(), false)
192193
};
193194

194-
turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers);
195+
turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers, truncate_timestamps);
195196
auto schema = ars.schema();
196197

197198
ASSERT_EQ(schema->num_fields(), 12);
@@ -225,7 +226,7 @@ TEST_F(ArrowResultSetTest, SingleBatchSingleColumnResultSetConversion)
225226
EXPECT_CALL(rs, do_get_buffers()).WillOnce(testing::Return(expected_buffers));
226227
EXPECT_CALL(rs, do_fetch_next_batch()).WillOnce(testing::Return(OUTPUT_SIZE)).WillOnce(testing::Return(0));
227228

228-
turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers);
229+
turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers, truncate_timestamps);
229230
std::shared_ptr<arrow::Table> table;
230231
ASSERT_OK(ars.fetch_all_native(&table, false));
231232
ASSERT_TRUE(expected_table->Equals(*table));

python/turbodbc/cursor.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ def _numpy_batch_generator(self):
307307
first_run = False
308308
yield result_batch
309309

310-
def fetcharrowbatches(self, strings_as_dictionary=False, adaptive_integers=False):
310+
def fetcharrowbatches(self, strings_as_dictionary=False, adaptive_integers=False, truncate_timestamps=False):
311311
"""
312312
Fetches rows in the active result set generated with ``execute()`` or
313313
``executemany()`` as an iterable of arrow tables.
@@ -320,6 +320,8 @@ def fetcharrowbatches(self, strings_as_dictionary=False, adaptive_integers=False
320320
smallest possible integer type in which all values can be
321321
stored. Be aware that here the type depends on the resulting
322322
data.
323+
:param truncate_timestamps: If true, instead of throwing if a timestamp is not between
324+
the valid years 1400 and 9999, a truncated value will be returned.
323325
324326
:return: generator of ``pyarrow.Table``
325327
"""
@@ -329,7 +331,8 @@ def fetcharrowbatches(self, strings_as_dictionary=False, adaptive_integers=False
329331
rs = make_arrow_result_set(
330332
self.impl.get_result_set(),
331333
strings_as_dictionary,
332-
adaptive_integers)
334+
adaptive_integers,
335+
truncate_timestamps)
333336
first_run = True
334337
while True:
335338
table = rs.fetch_next_batch()
@@ -341,7 +344,7 @@ def fetcharrowbatches(self, strings_as_dictionary=False, adaptive_integers=False
341344
else:
342345
raise Error(_NO_ARROW_SUPPORT_MSG)
343346

344-
def fetchallarrow(self, strings_as_dictionary=False, adaptive_integers=False):
347+
def fetchallarrow(self, strings_as_dictionary=False, adaptive_integers=False, truncate_timestamps=False):
345348
"""
346349
Fetches all rows in the active result set generated with ``execute()`` or
347350
``executemany()``.
@@ -355,6 +358,9 @@ def fetchallarrow(self, strings_as_dictionary=False, adaptive_integers=False):
355358
stored. Be aware that here the type depends on the resulting
356359
data.
357360
361+
:param truncate_timestamps: If true, instead of throwing if a timestamp is not between
362+
the valid years 1400 and 9999, a truncated value will be returned.
363+
358364
:return: ``pyarrow.Table``
359365
"""
360366
self._assert_valid_result_set()
@@ -363,7 +369,8 @@ def fetchallarrow(self, strings_as_dictionary=False, adaptive_integers=False):
363369
return make_arrow_result_set(
364370
self.impl.get_result_set(),
365371
strings_as_dictionary,
366-
adaptive_integers).fetch_all()
372+
adaptive_integers,
373+
truncate_timestamps).fetch_all()
367374
else:
368375
raise Error(_NO_ARROW_SUPPORT_MSG)
369376

0 commit comments

Comments
 (0)