Skip to content

Commit 1b13f92

Browse files
authored
[9898131742] Fix arrow projection with dynamic schema (#2630)
#### Reference Issues/PRs Monday ref: 9898131742 #### What does this implement or fix? This PR modifies `NullReducer` code to not rely on the slice index and by preserving a `column_block_offset_` state avoids an unneeded `log(n)` search for the offset. #### Any other comments? `NullReducer` code was assuming that `len(slice_and_keys) = len(row_slices_per_column)` when using `dynamic_schema=True`. That is not true if we use projections. E.g. for the following projection our slicing would look like: ``` Given: TD key 1: index A 1 1 2 2 TD key 2: index A B 3 3 1 4 4 2 TD key 3: index B 5 3 6 4 And we do a projection like `q.apply("C", q["A"] + q["B"])` our slicing would look like: Slice 1: TD key 1 Slice 2: TD key 2 Slice 3: index C 3 4 4 6 Slice 4: TD key 3 ``` #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent 68603dc commit 1b13f92

File tree

4 files changed

+78
-20
lines changed

4 files changed

+78
-20
lines changed

cpp/arcticdb/pipeline/read_frame.cpp

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,7 @@ class NullValueReducer {
732732
std::shared_ptr<PipelineContext> context_;
733733
SegmentInMemory frame_;
734734
size_t pos_;
735+
size_t column_block_idx_;
735736
DecodePathData shared_data_;
736737
std::any& handler_data_;
737738
const OutputFormat output_format_;
@@ -751,6 +752,7 @@ class NullValueReducer {
751752
context_(context),
752753
frame_(std::move(frame)),
753754
pos_(frame_.offset()),
755+
column_block_idx_(0),
754756
shared_data_(std::move(shared_data)),
755757
handler_data_(handler_data),
756758
output_format_(output_format),
@@ -761,18 +763,17 @@ class NullValueReducer {
761763
return context_row.slice_and_key().slice_.row_range.first;
762764
}
763765

764-
void backfill_all_zero_validity_bitmaps(size_t offset_bytes_start, size_t offset_bytes_end_idx) {
765-
// Explanation: offset_bytes_start and offset_bytes_end should both be elements of block_offsets by
766-
// construction. We must add an all zeros validity bitmap for each row-slice read from storage where this
767-
// column was missing, in order to correctly populate the Arrow record-batches for the output
766+
void backfill_all_zero_validity_bitmaps_up_to(std::optional<size_t> up_to_block_offset) {
767+
// Fills up all validity bitmaps with zeros from `column_block_idx_` until reaching `up_to_block_offset`.
768+
// If `up_to_block_offset` is `std::nullopt` then fills up until the end of the column.
768769
const auto& block_offsets = column_.block_offsets();
769-
auto start_it = std::ranges::lower_bound(block_offsets, offset_bytes_start);
770-
util::check(start_it != block_offsets.cend() && *start_it == offset_bytes_start,
771-
"NullValueReducer: Failed to find offset_bytes_start {} in block_offsets {}",
772-
offset_bytes_start, block_offsets);
773-
for (auto idx = static_cast<size_t>(std::distance(block_offsets.begin(), start_it)); idx < offset_bytes_end_idx; ++idx) {
774-
auto rows = (block_offsets.at(idx + 1) - block_offsets.at(idx)) / type_bytes_;
775-
create_dense_bitmap_all_zeros(block_offsets.at(idx), rows, column_, AllocationType::DETACHABLE);
770+
util::check(!up_to_block_offset.has_value() || up_to_block_offset.value() <= block_offsets.back(), "up_to_block_offset outside of range");
771+
for (; column_block_idx_ < block_offsets.size() - 1; ++column_block_idx_) {
772+
if (up_to_block_offset.has_value() && block_offsets.at(column_block_idx_) >= up_to_block_offset.value()) {
773+
break;
774+
}
775+
auto rows = (block_offsets.at(column_block_idx_ + 1) - block_offsets.at(column_block_idx_)) / type_bytes_;
776+
create_dense_bitmap_all_zeros(block_offsets.at(column_block_idx_), rows, column_, AllocationType::DETACHABLE);
776777
}
777778
}
778779

@@ -783,18 +784,20 @@ class NullValueReducer {
783784
if (current_pos != pos_) {
784785
const auto num_rows = current_pos - pos_;
785786
const auto start_row = pos_ - frame_.offset();
787+
const auto end_row = current_pos - frame_.offset();
786788
if (const std::shared_ptr<TypeHandler>& handler = get_type_handler(output_format_, column_.type()); handler) {
787789
handler->default_initialize(column_.buffer(), start_row * handler->type_size(), num_rows * handler->type_size(), shared_data_, handler_data_);
788790
} else if (output_format_ != OutputFormat::ARROW) {
789791
// Arrow does not care what values are in the main buffer where the validity bitmap is zero
790792
column_.default_initialize_rows(start_row, num_rows, false, default_value_);
791793
}
792794
if (output_format_ == OutputFormat::ARROW) {
793-
backfill_all_zero_validity_bitmaps(start_row * type_bytes_, context_row.index());
795+
backfill_all_zero_validity_bitmaps_up_to(end_row * type_bytes_);
794796
}
795-
pos_ = current_pos + sz_to_advance;
796-
} else {
797-
pos_ += sz_to_advance;
797+
}
798+
pos_ = current_pos + sz_to_advance;
799+
if (output_format_ == OutputFormat::ARROW) {
800+
++column_block_idx_;
798801
}
799802
}
800803

@@ -812,7 +815,7 @@ class NullValueReducer {
812815
column_.default_initialize_rows(start_row, num_rows, false, default_value_);
813816
}
814817
if (output_format_ == OutputFormat::ARROW) {
815-
backfill_all_zero_validity_bitmaps(start_row * type_bytes_, column_.block_offsets().size() - 1);
818+
backfill_all_zero_validity_bitmaps_up_to(std::nullopt);
816819
}
817820
}
818821
}

cpp/arcticdb/version/version_core.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1381,6 +1381,7 @@ void copy_frame_data_to_buffer(
13811381
const ColumnMapping mapping{src_column.type(), dst_column.type(), destination.field(target_index), type_size, num_rows, row_range.first, offset, total_size, target_index};
13821382
handler->convert_type(src_column, dst_column, mapping, shared_data, handler_data, source.string_pool_ptr());
13831383
} else if (is_empty_type(src_column.type().data_type())) {
1384+
// TODO: For arrow we want to set validity bitmaps instead of `initialize`ing
13841385
dst_column.type().visit_tag([&](auto dst_desc_tag) {
13851386
util::initialize<decltype(dst_desc_tag)>(dst_ptr, total_size, default_value);
13861387
});
@@ -1389,6 +1390,7 @@ void copy_frame_data_to_buffer(
13891390
details::visit_type(dst_column.type().data_type(), [&](auto dst_tag) {
13901391
using dst_type_info = ScalarTypeInfo<decltype(dst_tag)>;
13911392
typename dst_type_info::RawType* typed_dst_ptr = reinterpret_cast<typename dst_type_info::RawType*>(dst_ptr);
1393+
// TODO: For arrow we want to set validity bitmaps instead of `initialize`ing
13921394
util::initialize<typename dst_type_info::TDT>(dst_ptr, num_rows * dst_rawtype_size, default_value);
13931395
details::visit_type(src_column.type().data_type(), [&](auto src_tag) {
13941396
using src_type_info = ScalarTypeInfo<decltype(src_tag)>;
@@ -1408,6 +1410,7 @@ void copy_frame_data_to_buffer(
14081410
dst_ptr += row_count * sizeof(SourceType);
14091411
}
14101412
} else {
1413+
// TODO: For arrow we want to set validity bitmaps instead of `initialize`ing
14111414
util::initialize<SourceTDT>(dst_ptr, num_rows * dst_rawtype_size, default_value);
14121415
SourceType* typed_dst_ptr = reinterpret_cast<SourceType*>(dst_ptr);
14131416
Column::for_each_enumerated<SourceTDT>(src_column, [&](const auto& row) {

python/arcticdb/util/test.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,15 +242,25 @@ def assert_frame_equal_rebuild_index_first(expected: pd.DataFrame, actual: pd.Da
242242
assert_frame_equal(left=expected, right=actual)
243243

244244

245-
def convert_arrow_to_pandas_and_remove_categoricals(table):
245+
def convert_arrow_to_pandas_for_tests(table):
246+
"""
247+
Converts pa.Table outputted via `output_format=OutputFormat.EXPERIMENTAL_ARROW` to a pd.DataFrame so it would be
248+
identical to the one outputted via `output_format=OutputFormat.PANDAS`. This requires two changes:
249+
- Replaces dictionary encoded string columns with regular string columns.
250+
- Fills null values in int colums with zeros.
251+
"""
246252
new_table = stringify_dictionary_encoded_columns(table)
253+
for i, name in enumerate(new_table.column_names):
254+
if pa.types.is_integer(new_table.column(i).type):
255+
new_col = new_table.column(i).fill_null(0)
256+
new_table = new_table.set_column(i, name, new_col)
247257
return new_table.to_pandas()
248258

249259
def assert_frame_equal_with_arrow(left, right, **kwargs):
250260
if isinstance(left, pa.Table):
251-
left = convert_arrow_to_pandas_and_remove_categoricals(left)
261+
left = convert_arrow_to_pandas_for_tests(left)
252262
if isinstance(right, pa.Table):
253-
right = convert_arrow_to_pandas_and_remove_categoricals(right)
263+
right = convert_arrow_to_pandas_for_tests(right)
254264
assert_frame_equal(left, right, **kwargs)
255265

256266

python/tests/unit/arcticdb/version_store/test_arrow.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@
1010
from arcticdb.version_store.processing import QueryBuilder
1111
from arcticdb.options import OutputFormat
1212
import pyarrow as pa
13+
import pyarrow.compute as pc
1314
from arcticdb.util.hypothesis import (
1415
use_of_function_scoped_fixtures_in_hypothesis_checked,
1516
ENDIANNESS,
1617
supported_string_dtypes,
1718
dataframe_strategy,
1819
column_strategy,
1920
)
20-
from arcticdb.util.test import get_sample_dataframe
21+
from arcticdb.util.test import get_sample_dataframe, make_dynamic
2122
from arcticdb_ext.storage import KeyType
2223
from tests.util.mark import WINDOWS
2324

@@ -625,3 +626,44 @@ def test_arrow_dynamic_schema_filtered_column(lmdb_version_store_dynamic_schema_
625626
q = q[q["col"] < 5]
626627
received = stringify_dictionary_encoded_columns(lib.read(sym, query_builder=q).data)
627628
assert expected.equals(received)
629+
630+
631+
def test_project_dynamic_schema(lmdb_version_store_dynamic_schema_v1):
632+
lib = lmdb_version_store_dynamic_schema_v1
633+
lib.set_output_format(OutputFormat.EXPERIMENTAL_ARROW)
634+
sym = "sym"
635+
table_1 = pa.table({"a": pa.array([1, 2])})
636+
table_2 = pa.table({"a": pa.array([3, 4]), "b": pa.array([1, 2])})
637+
table_3 = pa.table({"b": pa.array([3, 4])})
638+
lib.write(sym, table_1.to_pandas())
639+
lib.append(sym, table_2.to_pandas())
640+
lib.append(sym, table_3.to_pandas())
641+
q = QueryBuilder()
642+
q = q.apply("c", q["a"] * q["b"] + 10)
643+
received = lib.read(sym, query_builder=q).data
644+
expected = pa.concat_tables([table_1, table_2, table_3], promote_options="permissive")
645+
expected_new_col = pc.add(pc.multiply(expected.column("a"), expected.column("b")), 10)
646+
expected = expected.append_column("c", expected_new_col)
647+
assert expected.equals(received)
648+
649+
650+
def test_project_dynamic_schema_complex(lmdb_version_store_dynamic_schema_v1):
651+
lib = lmdb_version_store_dynamic_schema_v1
652+
lib.set_output_format(OutputFormat.EXPERIMENTAL_ARROW)
653+
sym = "sym"
654+
df = pd.DataFrame({
655+
"int_col_1": np.arange(0, 10, dtype=np.int16),
656+
"int_col_2": np.arange(10, 20, dtype=np.int32),
657+
"float_col": np.arange(20, 30, dtype=np.float64),
658+
})
659+
expected, slices = make_dynamic(df)
660+
for df_slice in slices:
661+
lib.append(sym, df_slice, write_if_missing=True)
662+
663+
q = QueryBuilder()
664+
q = q.apply("new_float_1", q["int_col_1"] / q["float_col"] + 1)
665+
q = q.apply("new_float_2", q["int_col_2"] * q["new_float_1"])
666+
667+
table = lib.read(sym, query_builder=q).data
668+
expected = lib.read(sym, query_builder=q, output_format=OutputFormat.PANDAS).data
669+
assert_frame_equal_with_arrow(table, expected)

0 commit comments

Comments
 (0)