Skip to content

Commit ffe58bc

Browse files
authored
[9898177828] Use default_value in NullReducer for OutputFormat::ARROW (#2633)
#### Reference Issues/PRs Monday ref: 9898177828 #### What does this implement or fix? When doing aggregation we explicitly default `sum=0` for slices with no underlying values. For arrow this means to not set the validity bitmap in this case and to default initialize the values. The change includes: - Small refactor of `NullReducer` to extract common parts between `reduce` and `finalize` in `backfill_up_to_frame_offset` - Modification of `Column::default_initialize` to work across several blocks - Removes broken `memset` method from `ChunkedBuffer` and instead provides a new `util::initialize` method which can initialize a `ChunkedBuffer` across blocks #### Any other comments? #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent a10ecd7 commit ffe58bc

File tree

5 files changed

+84
-81
lines changed

5 files changed

+84
-81
lines changed

cpp/arcticdb/column_store/chunked_buffer.hpp

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ class ChunkedBufferImpl {
311311

312312
uint8_t* bytes_at(size_t pos_bytes, size_t required) {
313313
auto [block, pos, _] = block_and_offset(pos_bytes);
314-
util::check(pos + required <= block->bytes(), "Block overflow, position {} is greater than block capacity {}", pos, block->bytes());
314+
util::check(pos + required <= block->bytes(), "Block overflow, position {} is greater than block capacity {}", pos + required, block->bytes());
315315
return &(*block)[pos];
316316
}
317317

@@ -366,21 +366,21 @@ class ChunkedBufferImpl {
366366
}
367367
}
368368

369-
void memset_buffer(size_t offset, size_t bytes, char value) {
370-
auto [block, pos, block_index] = block_and_offset(offset);
371-
while(bytes > 0) {
372-
const auto size_to_write = block->bytes() - pos;
373-
memset(block->data() + pos, size_to_write, value);
374-
bytes -= size_to_write;
375-
if(bytes > 0) {
376-
++block_index;
377-
if(block_index == blocks_.size())
378-
return;
379-
380-
block = blocks_[block_index];
381-
pos = 0;
382-
}
369+
// Returns a vector of continuous buffers, each designated by a pointer and size
370+
// Similar to `bytes_at` but will work if the requested range spans multiple continuous blocks.
371+
std::vector<std::pair<uint8_t*, size_t>> byte_blocks_at(size_t pos_bytes, size_t required_bytes) {
372+
check_bytes(pos_bytes, required_bytes);
373+
std::vector<std::pair<uint8_t*, size_t>> result;
374+
auto [block, pos, block_index] = block_and_offset(pos_bytes);
375+
while(required_bytes > 0) {
376+
block = blocks_[block_index];
377+
const auto size_to_write = std::min(required_bytes, block->bytes() - pos);
378+
result.push_back({block->data() + pos, size_to_write});
379+
required_bytes -= size_to_write;
380+
++block_index;
381+
pos = 0;
383382
}
383+
return result;
384384
}
385385

386386
template<typename T>

cpp/arcticdb/column_store/column.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -670,9 +670,7 @@ void Column::default_initialize_rows(size_t start_pos, size_t num_rows, bool ens
670670
if (ensure_alloc) {
671671
data_.ensure<uint8_t>(bytes);
672672
}
673-
// This doesn't work if we default_initialize bytes which span across multiple blocks.
674-
auto type_ptr = reinterpret_cast<RawType *>(data_.bytes_at(start_pos * sizeof(RawType), bytes));
675-
util::initialize<T>(reinterpret_cast<uint8_t*>(type_ptr), bytes, default_value);
673+
util::initialize<T>(data_.buffer(), start_pos * sizeof(RawType), bytes, default_value);
676674
if (ensure_alloc) {
677675
data_.commit();
678676
}

cpp/arcticdb/pipeline/read_frame.cpp

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -763,38 +763,38 @@ class NullValueReducer {
763763
return context_row.slice_and_key().slice_.row_range.first;
764764
}
765765

766-
void backfill_all_zero_validity_bitmaps_up_to(std::optional<size_t> up_to_block_offset) {
766+
void backfill_all_zero_validity_bitmaps_up_to(size_t up_to_block_offset) {
767767
// Fills up all validity bitmaps with zeros from `column_block_idx_` until reaching `up_to_block_offset`.
768-
// If `up_to_block_offset` is `std::nullopt` then fills up until the end of the column.
769768
const auto& block_offsets = column_.block_offsets();
770-
util::check(!up_to_block_offset.has_value() || up_to_block_offset.value() <= block_offsets.back(), "up_to_block_offset outside of range");
771-
for (; column_block_idx_ < block_offsets.size() - 1; ++column_block_idx_) {
772-
if (up_to_block_offset.has_value() && block_offsets.at(column_block_idx_) >= up_to_block_offset.value()) {
773-
break;
774-
}
769+
util::check(up_to_block_offset <= block_offsets.back(), "up_to_block_offset {} outside of range {}", up_to_block_offset, block_offsets.back());
770+
for (; column_block_idx_ < block_offsets.size() - 1 && block_offsets.at(column_block_idx_) < up_to_block_offset; ++column_block_idx_) {
775771
auto rows = (block_offsets.at(column_block_idx_ + 1) - block_offsets.at(column_block_idx_)) / type_bytes_;
776772
create_dense_bitmap_all_zeros(block_offsets.at(column_block_idx_), rows, column_, AllocationType::DETACHABLE);
777773
}
778774
}
779775

780-
void reduce(PipelineContextRow &context_row){
781-
auto &slice_and_key = context_row.slice_and_key();
782-
auto sz_to_advance = slice_and_key.slice_.row_range.diff();
783-
auto current_pos = context_row.slice_and_key().slice_.row_range.first;
784-
if (current_pos != pos_) {
785-
const auto num_rows = current_pos - pos_;
776+
void backfill_up_to_frame_offset(size_t up_to) {
777+
if (pos_ != up_to) {
778+
const auto num_rows = up_to - pos_;
786779
const auto start_row = pos_ - frame_.offset();
787-
const auto end_row = current_pos - frame_.offset();
780+
const auto end_row = up_to - frame_.offset();
788781
if (const std::shared_ptr<TypeHandler>& handler = get_type_handler(output_format_, column_.type()); handler) {
789782
handler->default_initialize(column_.buffer(), start_row * handler->type_size(), num_rows * handler->type_size(), shared_data_, handler_data_);
790-
} else if (output_format_ != OutputFormat::ARROW) {
783+
} else if (output_format_ != OutputFormat::ARROW || default_value_.has_value()) {
791784
// Arrow does not care what values are in the main buffer where the validity bitmap is zero
792785
column_.default_initialize_rows(start_row, num_rows, false, default_value_);
793786
}
794-
if (output_format_ == OutputFormat::ARROW) {
787+
if (output_format_ == OutputFormat::ARROW && !default_value_.has_value()) {
795788
backfill_all_zero_validity_bitmaps_up_to(end_row * type_bytes_);
796789
}
797790
}
791+
}
792+
793+
void reduce(PipelineContextRow &context_row){
794+
auto &slice_and_key = context_row.slice_and_key();
795+
auto sz_to_advance = slice_and_key.slice_.row_range.diff();
796+
auto current_pos = context_row.slice_and_key().slice_.row_range.first;
797+
backfill_up_to_frame_offset(current_pos);
798798
pos_ = current_pos + sz_to_advance;
799799
if (output_format_ == OutputFormat::ARROW) {
800800
++column_block_idx_;
@@ -804,20 +804,8 @@ class NullValueReducer {
804804
void finalize() {
805805
const auto total_rows = frame_.row_count();
806806
const auto end = frame_.offset() + total_rows;
807-
if(pos_ != end) {
808-
util::check(pos_ < end, "Overflow in finalize {} > {}", pos_, end);
809-
const auto num_rows = end - pos_;
810-
const auto start_row = pos_ - frame_.offset();
811-
if (const std::shared_ptr<TypeHandler>& handler = get_type_handler(output_format_, column_.type()); handler) {
812-
handler->default_initialize(column_.buffer(), start_row * handler->type_size(), num_rows * handler->type_size(), shared_data_, handler_data_);
813-
} else if (output_format_ != OutputFormat::ARROW) {
814-
// Arrow does not care what values are in the main buffer where the validity bitmap is zero
815-
column_.default_initialize_rows(start_row, num_rows, false, default_value_);
816-
}
817-
if (output_format_ == OutputFormat::ARROW) {
818-
backfill_all_zero_validity_bitmaps_up_to(std::nullopt);
819-
}
820-
}
807+
util::check(pos_ <= end, "Overflow in finalize {} > {}", pos_, end);
808+
backfill_up_to_frame_offset(end);
821809
}
822810
};
823811

cpp/arcticdb/util/sparse_utils.hpp

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -86,39 +86,6 @@ void default_initialize(uint8_t* data, const size_t bytes) {
8686
}
8787
}
8888

89-
template <typename TagType>
90-
requires util::instantiation_of<TagType, TypeDescriptorTag>
91-
void default_initialize(ChunkedBuffer& buffer, size_t offset, const size_t bytes, DecodePathData shared_data, std::any& handler_data) {
92-
using RawType = typename TagType::DataTypeTag::raw_type;
93-
const auto num_rows ARCTICDB_UNUSED = bytes / sizeof(RawType);
94-
constexpr auto type = static_cast<TypeDescriptor>(TagType{});
95-
constexpr auto data_type = type.data_type();
96-
ColumnData column_data{&buffer, type};
97-
auto pos = column_data.begin<TagType, IteratorType::REGULAR, IteratorDensity::DENSE, false>();
98-
std::advance(pos, offset);
99-
//auto end = column_data.begin<TagType, IteratorType::REGULAR, IteratorDensity::DENSE, false>();
100-
if constexpr (is_sequence_type(data_type)) {
101-
std::fill_n(pos, num_rows, not_a_string());
102-
} else if constexpr (is_floating_point_type(data_type)) {
103-
std::fill_n(pos, num_rows, std::numeric_limits<RawType>::quiet_NaN());
104-
} else if constexpr (is_time_type(data_type)) {
105-
std::fill_n(pos, num_rows, NaT);
106-
} else if constexpr (is_integer_type(data_type) || is_bool_type(data_type)) {
107-
buffer.memset_buffer(offset, bytes, 0);
108-
} else {
109-
constexpr auto type_descriptor = TagType::type_descriptor();
110-
if (const std::shared_ptr<TypeHandler>& handler = arcticdb::TypeHandlerRegistry::instance()->get_handler(type_descriptor);handler) {
111-
handler->default_initialize(buffer, offset, bytes, shared_data, handler_data);
112-
} else {
113-
internal::raise<ErrorCode::E_INVALID_ARGUMENT>(
114-
"Default initialization for {} is not implemented.",
115-
type_descriptor
116-
);
117-
}
118-
}
119-
}
120-
121-
12289
/// Initialize a buffer either using a custom default value or using a predefined default value for the type
12390
/// @param[in] default_value Variant holding either a value of the raw type for the type tag or std::monostate
12491
template <typename TagType>
@@ -137,6 +104,15 @@ void initialize(uint8_t* data, const size_t bytes, const std::optional<Value>& d
137104
}
138105
}
139106

107+
template <typename TagType>
108+
requires util::instantiation_of<TagType, TypeDescriptorTag>
109+
void initialize(ChunkedBuffer& buffer, size_t offset, size_t bytes, const std::optional<Value>& default_value) {
110+
auto blocks = buffer.byte_blocks_at(offset, bytes);
111+
for (auto [data, size] : blocks) {
112+
initialize<TagType>(data, size, default_value);
113+
}
114+
}
115+
140116
[[nodiscard]] util::BitSet scan_object_type_to_sparse(
141117
const PyObject* const* ptr,
142118
size_t rows_to_write);

python/tests/unit/arcticdb/version_store/test_arrow.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,3 +667,44 @@ def test_project_dynamic_schema_complex(lmdb_version_store_dynamic_schema_v1):
667667
table = lib.read(sym, query_builder=q).data
668668
expected = lib.read(sym, query_builder=q, output_format=OutputFormat.PANDAS).data
669669
assert_frame_equal_with_arrow(table, expected)
670+
671+
672+
def test_aggregation_empty_slices(lmdb_version_store_dynamic_schema_v1):
673+
lib = lmdb_version_store_dynamic_schema_v1
674+
lib.set_output_format(OutputFormat.EXPERIMENTAL_ARROW)
675+
sym = "sym"
676+
df_1 = pd.DataFrame({
677+
"group_col": [chr(ord("a")+i) for i in range(5)],
678+
"mean_col": np.arange(0, 5, dtype=np.float64),
679+
"sum_col": np.arange(0, 5, dtype=np.float64),
680+
"min_col": np.arange(0, 5, dtype=np.float64),
681+
"max_col": np.arange(0, 5, dtype=np.float64),
682+
"count_col": np.arange(0, 5, dtype=np.float64),
683+
})
684+
df_2 = pd.DataFrame({
685+
"group_col": [chr(ord("a")+i+10) for i in range(5)],
686+
})
687+
lib.write(sym, df_1, dynamic_strings=True)
688+
lib.append(sym, df_2, dynamic_strings=True)
689+
690+
q = QueryBuilder()
691+
q.groupby("group_col").agg({
692+
"mean_col": "mean",
693+
"sum_col": "sum",
694+
"min_col": "min",
695+
"max_col": "max",
696+
"count_col": "count",
697+
})
698+
699+
table = lib.read(sym, query_builder=q).data
700+
# sum_col is correctly filled with 0s instead of nulls
701+
assert pc.count(table.column("sum_col"), mode="only_null").as_py() == 0
702+
# TODO: Fix the TODOs in `CopyToBufferTask` to make num_nulls=5 as expected
703+
# For this test it so happens that one present and one missing value end up in the same bucket.
704+
# Copying then default initializes the missing values instead of setting the validity bitmap.
705+
# assert pc.count(table.column("mean_col"), mode="only_null").as_py() == 5
706+
# assert pc.count(table.column("min_col"), mode="only_null").as_py() == 5
707+
# assert pc.count(table.column("max_col"), mode="only_null").as_py() == 5
708+
# assert pc.count(table.column("count_col"), mode="only_null").as_py() == 5
709+
expected = lib.read(sym, query_builder=q, output_format=OutputFormat.PANDAS).data
710+
assert_frame_equal_with_arrow(table, expected)

0 commit comments

Comments
 (0)