Skip to content

Commit dd746f0

Browse files
authored
Arrow fix string cols with nones and nans (#2586)
#### What does this implement or fix? Allow reading into Arrow string columns containing Nones and NaNs
1 parent 5bbe6f3 commit dd746f0

File tree

4 files changed

+127
-70
lines changed

4 files changed

+127
-70
lines changed

cpp/arcticdb/arrow/arrow_handlers.cpp

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,6 @@ void ArrowStringHandler::convert_type(
5757
const std::shared_ptr<StringPool>& string_pool) const {
5858
using ArcticStringColumnTag = ScalarTagType<DataTypeTag<DataType::UTF_DYNAMIC64>>;
5959
auto input_data = source_column.data();
60-
auto pos = input_data.cbegin<ArcticStringColumnTag>();
61-
const auto end = input_data.cend<ArcticStringColumnTag>();
62-
6360
struct DictEntry {
6461
int32_t offset_buffer_pos_;
6562
int64_t string_buffer_pos_;
@@ -75,29 +72,52 @@ void ArrowStringHandler::convert_type(
7572
int32_t unique_offset_count = 0;
7673
auto dest_ptr = reinterpret_cast<int32_t*>(dest_column.bytes_at(mapping.offset_bytes_, source_column.row_count() * sizeof(int32_t)));
7774

75+
util::BitSet bitset;
76+
util::BitSet::bulk_insert_iterator inserter(bitset);
77+
const auto end = input_data.cend<ArcticStringColumnTag, IteratorType::ENUMERATED>();
7878
// First go through the source column once to compute the size of offset and string buffers.
79-
while(pos != end) {
80-
auto [entry, is_emplaced] = unique_offsets.try_emplace(*pos, DictEntry{unique_offset_count, bytes, string_pool->get_const_view(*pos)});
81-
if(is_emplaced) {
82-
bytes += entry->second.strv.size();
83-
unique_offsets_in_order.push_back(*pos);
84-
++unique_offset_count;
79+
// TODO: This can't be right if the column was sparse as it has only been decoded, not expanded
80+
for (auto en = input_data.cbegin<ArcticStringColumnTag, IteratorType::ENUMERATED>(); en != end; ++en) {
81+
if (is_a_string(en->value())) {
82+
auto [entry, is_emplaced] = unique_offsets.try_emplace(en->value(), DictEntry{static_cast<int32_t>(unique_offset_count), bytes, string_pool->get_const_view(en->value())});
83+
if (is_emplaced) {
84+
bytes += entry->second.strv.size();
85+
unique_offsets_in_order.push_back(en->value());
86+
++unique_offset_count;
87+
}
88+
*dest_ptr = entry->second.offset_buffer_pos_;
89+
} else {
90+
inserter = en->idx();
8591
}
86-
++pos;
87-
*dest_ptr++ = entry->second.offset_buffer_pos_;
92+
++dest_ptr;
8893
}
89-
auto& string_buffer = dest_column.create_extra_buffer(mapping.offset_bytes_, ExtraBufferType::STRING, bytes, AllocationType::DETACHABLE);
90-
auto& offsets_buffer = dest_column.create_extra_buffer(mapping.offset_bytes_, ExtraBufferType::OFFSET, (unique_offsets_in_order.size() + 1) * sizeof(int64_t), AllocationType::DETACHABLE);
91-
// Then go through unique_offsets to fill up the offset and string buffers.
92-
auto offsets_ptr = reinterpret_cast<int64_t*>(offsets_buffer.data());
93-
auto string_ptr = reinterpret_cast<char*>(string_buffer.data());
94-
for (auto unique_offset: unique_offsets_in_order) {
95-
const auto& entry = unique_offsets[unique_offset];
96-
*offsets_ptr++ = entry.string_buffer_pos_;
97-
memcpy(string_ptr, entry.strv.data(), entry.strv.size());
98-
string_ptr += entry.strv.size();
94+
inserter.flush();
95+
// At this point bitset has ones where the source column contained None or NaN
96+
// Inverting and shrinking to the source column size it then makes a sparse map for the input data
97+
bitset.invert();
98+
// TODO: row_count() here won't be right when the original data was sparse, but we don't support sparse
99+
// string columns yet anyway
100+
bitset.resize(source_column.row_count());
101+
if (bitset.count() != bitset.size()) {
102+
handle_truncation(bitset, mapping.truncate_);
103+
create_dense_bitmap(mapping.offset_bytes_, bitset, dest_column, AllocationType::DETACHABLE);
104+
} // else there weren't any Nones or NaNs
105+
// bitset.count() == 0 is the special case where all of the rows contained None or NaN. In this case, do not create
106+
// the extra string and offset buffers. string_dict_from_block will then do the right thing and call minimal_strings_dict
107+
if (bitset.count() > 0) {
108+
auto& string_buffer = dest_column.create_extra_buffer(mapping.offset_bytes_, ExtraBufferType::STRING, bytes, AllocationType::DETACHABLE);
109+
auto& offsets_buffer = dest_column.create_extra_buffer(mapping.offset_bytes_, ExtraBufferType::OFFSET, (unique_offsets_in_order.size() + 1) * sizeof(int64_t), AllocationType::DETACHABLE);
110+
// Then go through unique_offsets to fill up the offset and string buffers.
111+
auto offsets_ptr = reinterpret_cast<int64_t*>(offsets_buffer.data());
112+
auto string_ptr = reinterpret_cast<char*>(string_buffer.data());
113+
for (auto unique_offset: unique_offsets_in_order) {
114+
const auto& entry = unique_offsets[unique_offset];
115+
*offsets_ptr++ = entry.string_buffer_pos_;
116+
memcpy(string_ptr, entry.strv.data(), entry.strv.size());
117+
string_ptr += entry.strv.size();
118+
}
119+
*offsets_ptr = bytes;
99120
}
100-
*offsets_ptr = bytes;
101121
}
102122

103123
TypeDescriptor ArrowStringHandler::output_type(const TypeDescriptor&) const {

cpp/arcticdb/pipeline/column_mapping.hpp

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77

88
#pragma once
9+
#include <arcticdb/column_store/column.hpp>
910
#include <arcticdb/entity/types.hpp>
1011
#include <arcticdb/util/bitset.hpp>
1112
#include <optional>
@@ -103,4 +104,52 @@ struct StaticColumnMappingIterator {
103104
[[nodiscard]] size_t index_fieldcount() const;
104105
};
105106

107+
inline void handle_truncation(
108+
Column& dest_column,
109+
const ColumnTruncation& truncate) {
110+
if(dest_column.num_blocks() == 1 && truncate.start_ && truncate.end_) {
111+
dest_column.truncate_single_block(*truncate.start_, *truncate.end_);
112+
} else {
113+
if(truncate.start_)
114+
dest_column.truncate_first_block(*truncate.start_);
115+
if(truncate.end_)
116+
dest_column.truncate_last_block(*truncate.end_);
117+
}
118+
}
119+
120+
inline void handle_truncation(
121+
Column& dest_column,
122+
const ColumnMapping& mapping) {
123+
handle_truncation(dest_column, mapping.truncate_);
124+
}
125+
126+
inline void handle_truncation(util::BitSet& bv, const ColumnTruncation& truncate) {
127+
if (truncate.start_) {
128+
bv = util::truncate_sparse_map(bv, *truncate.start_, truncate.end_.value_or(bv.size()));
129+
} else if (truncate.end_) {
130+
// More efficient than util::truncate_sparse_map as it avoids a copy
131+
bv.resize(*truncate.end_);
132+
}
133+
}
134+
135+
inline void create_dense_bitmap(size_t offset, const util::BitSet& sparse_map, Column& dest_column, AllocationType allocation_type) {
136+
auto& sparse_buffer = dest_column.create_extra_buffer(
137+
offset,
138+
ExtraBufferType::BITMAP,
139+
bitset_packed_size_bytes(sparse_map.size()),
140+
allocation_type);
141+
142+
bitset_to_packed_bits(sparse_map, sparse_buffer.data());
143+
}
144+
145+
inline void create_dense_bitmap_all_zeros(size_t offset, size_t num_bits, Column& dest_column, AllocationType allocation_type) {
146+
auto num_bytes = bitset_packed_size_bytes(num_bits);
147+
auto& sparse_buffer = dest_column.create_extra_buffer(
148+
offset,
149+
ExtraBufferType::BITMAP,
150+
num_bytes,
151+
allocation_type);
152+
std::memset(sparse_buffer.data(), 0, num_bytes);
153+
}
154+
106155
} // namespace arcticdb

cpp/arcticdb/pipeline/read_frame.cpp

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -243,54 +243,6 @@ void decode_index_field(
243243
}
244244
}
245245

246-
void handle_truncation(
247-
Column& dest_column,
248-
const ColumnTruncation& truncate) {
249-
if(dest_column.num_blocks() == 1 && truncate.start_ && truncate.end_) {
250-
dest_column.truncate_single_block(*truncate.start_, *truncate.end_);
251-
} else {
252-
if(truncate.start_)
253-
dest_column.truncate_first_block(*truncate.start_);
254-
if(truncate.end_)
255-
dest_column.truncate_last_block(*truncate.end_);
256-
}
257-
}
258-
259-
void handle_truncation(
260-
Column& dest_column,
261-
const ColumnMapping& mapping) {
262-
handle_truncation(dest_column, mapping.truncate_);
263-
}
264-
265-
void handle_truncation(util::BitSet& bv, const ColumnTruncation& truncate) {
266-
if (truncate.start_) {
267-
bv = util::truncate_sparse_map(bv, *truncate.start_, truncate.end_.value_or(bv.size()));
268-
} else if (truncate.end_) {
269-
// More efficient than util::truncate_sparse_map as it avoids a copy
270-
bv.resize(*truncate.end_);
271-
}
272-
}
273-
274-
void create_dense_bitmap(size_t offset, const util::BitSet& sparse_map, Column& dest_column, AllocationType allocation_type) {
275-
auto& sparse_buffer = dest_column.create_extra_buffer(
276-
offset,
277-
ExtraBufferType::BITMAP,
278-
bitset_packed_size_bytes(sparse_map.size()),
279-
allocation_type);
280-
281-
bitset_to_packed_bits(sparse_map, sparse_buffer.data());
282-
}
283-
284-
void create_dense_bitmap_all_zeros(size_t offset, size_t num_bits, Column& dest_column, AllocationType allocation_type) {
285-
auto num_bytes = bitset_packed_size_bytes(num_bits);
286-
auto& sparse_buffer = dest_column.create_extra_buffer(
287-
offset,
288-
ExtraBufferType::BITMAP,
289-
num_bytes,
290-
allocation_type);
291-
std::memset(sparse_buffer.data(), 0, num_bytes);
292-
}
293-
294246
void decode_or_expand(
295247
const uint8_t*& data,
296248
Column& dest_column,

python/tests/unit/arcticdb/version_store/test_arrow.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,42 @@ def test_strings_basic(lmdb_version_store_arrow, dynamic_strings):
9393
assert_frame_equal_with_arrow(table, df)
9494

9595

96+
@pytest.mark.parametrize("row_range", [None, (2, 3), (2, 4), (2, 5), (2, 6), (3, 4), (3, 5), (3, 6)])
97+
def test_strings_with_nones_and_nans(lmdb_version_store_tiny_segment, row_range):
98+
lib = lmdb_version_store_tiny_segment
99+
lib.set_output_format(OutputFormat.EXPERIMENTAL_ARROW)
100+
# lmdb_version_store_tiny_segment has 2 rows per segment
101+
# This column is constructed so that every 2-element permutation of strings, Nones, and NaNs are tested
102+
df = pd.DataFrame(
103+
{
104+
"x": [
105+
"a",
106+
"b",
107+
"c",
108+
None,
109+
None,
110+
"d",
111+
"e",
112+
np.nan,
113+
np.nan,
114+
"f",
115+
None,
116+
None,
117+
None,
118+
np.nan,
119+
np.nan,
120+
None,
121+
np.nan,
122+
np.nan,
123+
]
124+
}
125+
)
126+
lib.write("arrow", df, dynamic_strings=True)
127+
table = lib.read("arrow", row_range=row_range).data
128+
expected = lib.read("arrow", row_range=row_range, output_format=OutputFormat.PANDAS).data
129+
assert_frame_equal_with_arrow(table, expected)
130+
131+
96132
@pytest.mark.skipif(WINDOWS, reason="Fixed-width string columns not supported on Windows")
97133
def test_fixed_width_strings(lmdb_version_store_arrow):
98134
lib = lmdb_version_store_arrow

0 commit comments

Comments
 (0)