Skip to content

Commit 68a22e4

Browse files
authored
Fix entire column default initialize for arrow (#2580)
Arrow uses chunked buffers but default_initialize assumed that data is contiguous, which was causing segfaults. Adds: - Test which repro-ed the segfault - Fix by using `NullValueReducer::finalize` - Change `Column::default_initialize_rows` to use `bytes_at` which does correct boundary checks - Adds a TODO comment to replace all `ptr_cast` calls to use `bytes_at`. Not done in this PR to avoid risk of touching very low level code. #### Reference Issues/PRs <!--Example: Fixes #1234. See also #3456.--> #### What does this implement or fix? #### Any other comments? #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent 78f5d2e commit 68a22e4

File tree

5 files changed

+32
-6
lines changed

5 files changed

+32
-6
lines changed

cpp/arcticdb/column_store/chunked_buffer.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,9 @@ class ChunkedBufferImpl {
386386

387387
template<typename T>
388388
T *ptr_cast(size_t pos_bytes, size_t required_bytes) {
389+
// TODO: This check doesn't verify we're overreaching outside of block boundaries.
390+
// We should instead use `bytes_at` which does the correct check like so:
391+
// return reinterpret_cast<T *>(bytes_at(pos_bytes, required_bytes))
389392
check_bytes(pos_bytes, required_bytes);
390393
return reinterpret_cast<T *>(&operator[](pos_bytes));
391394
}

cpp/arcticdb/column_store/column.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,8 @@ void Column::default_initialize_rows(size_t start_pos, size_t num_rows, bool ens
386386
if (ensure_alloc) {
387387
data_.ensure<uint8_t>(bytes);
388388
}
389-
auto type_ptr = data_.ptr_cast<RawType>(start_pos, bytes);
389+
// This doesn't work if we default_initialize bytes which span across multiple blocks.
390+
auto type_ptr = reinterpret_cast<RawType *>(data_.bytes_at(start_pos * sizeof(RawType), bytes));
390391
util::initialize<T>(reinterpret_cast<uint8_t*>(type_ptr), bytes, default_value);
391392
if (ensure_alloc) {
392393
data_.commit();

cpp/arcticdb/pipeline/read_frame.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,8 @@ struct ReduceColumnTask : async::BaseTask {
925925
auto& prev_buffer = column.buffer();
926926
swap(prev_buffer, new_buffer);
927927
} else {
928-
column.default_initialize_rows(0, frame_.row_count(), false, default_value);
928+
NullValueReducer null_reducer{column, context_, frame_, shared_data_, handler_data_, read_options_.output_format(), default_value};
929+
null_reducer.finalize();
929930
}
930931
}
931932
} else if (column_data != slice_map_->columns_.end()) {

cpp/arcticdb/util/cursored_buffer.hpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,11 @@ struct CursoredBuffer {
141141
}
142142

143143
uint8_t* bytes_at(size_t bytes, size_t required) {
144-
util::check(bytes + required <= buffer_.bytes(), "Bytes overflow, can't write {} bytes at position {} in buffer of size {}", required, bytes, buffer_.bytes());
145-
return &buffer_[bytes];
144+
return buffer_.bytes_at(bytes, required);
146145
}
147146

148147
const uint8_t* bytes_at(size_t bytes, size_t required) const {
149-
util::check(bytes + required <= buffer_.bytes(), "Bytes overflow, can't write {} bytes at position {} in buffer of size {}", required, bytes, buffer_.bytes());
150-
return &buffer_[bytes];
148+
return buffer_.bytes_at(bytes, required);
151149
}
152150

153151
void clear() {

python/tests/unit/arcticdb/version_store/test_arrow.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,3 +566,26 @@ def test_arrow_sparse_floats_hypothesis(lmdb_version_store_arrow, df, rows_per_s
566566
expected = pa.concat_tables([pa.Table.from_pandas(row_slice) for row_slice in row_slices])
567567
received = lib.read(sym).data
568568
assert expected.equals(received)
569+
570+
571+
@pytest.mark.parametrize(
572+
"type_to_drop", [pa.int64(), pa.float64(), pa.large_string()]
573+
)
574+
def test_arrow_dynamic_schema_filtered_column(lmdb_version_store_dynamic_schema_v1, type_to_drop):
575+
lib = lmdb_version_store_dynamic_schema_v1
576+
lib.set_output_format(OutputFormat.EXPERIMENTAL_ARROW)
577+
sym = "sym"
578+
column_to_drop = pa.array(["a", "b"], type_to_drop) if type_to_drop == pa.large_string() else pa.array([1, 2], type_to_drop)
579+
table_1 = pa.table({"col": pa.array([0, 1])})
580+
table_2 = pa.table({"col": pa.array([5, 6]), "col_to_drop": column_to_drop})
581+
table_3 = pa.table({"col": pa.array([2, 3])})
582+
# TODO: Remove to_pandas() when we support writing Arrow structures directly
583+
lib.write(sym, table_1.to_pandas())
584+
lib.append(sym, table_2.to_pandas())
585+
lib.append(sym, table_3.to_pandas())
586+
expected = pa.concat_tables([table_1, table_2, table_3], promote_options="permissive")
587+
expected = expected.filter(pa.compute.field("col") < 5)
588+
q = QueryBuilder()
589+
q = q[q["col"] < 5]
590+
received = stringify_dictionary_encoded_columns(lib.read(sym, query_builder=q).data)
591+
assert expected.equals(received)

0 commit comments

Comments
 (0)