Skip to content

Commit 803c91e

Browse files
authored
[10026766759] Correct sparse handling for Aggregation clauses (#2644)
#### Reference Issues/PRs Monday ref: 10026766759 #### What does this implement or fix? - Makes Aggregation clauses like `Mean` and `Count` respect input column sparsity - Fixes `CopyToBufferTask` to respect sparsity for arrow - Adds a similar test for resampling - Adds an xfail test for monday issue: 10029194063 #### Any other comments? Commits can be reviewed individually #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent 7acd347 commit 803c91e

File tree

8 files changed

+270
-121
lines changed

8 files changed

+270
-121
lines changed

cpp/arcticdb/arrow/array_from_block.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ inline std::optional<sparrow::validity_bitmap> create_validity_bitmap(
1919
) {
2020
if (column.has_extra_buffer(offset, ExtraBufferType::BITMAP)) {
2121
auto& bitmap_buffer = column.get_extra_buffer(offset, ExtraBufferType::BITMAP);
22+
util::check(
23+
bitmap_buffer.blocks().size() == 1,
24+
"Expected a single block bitmap extra buffer but got {} blocks",
25+
bitmap_buffer.blocks().size()
26+
);
2227
return sparrow::validity_bitmap{reinterpret_cast<uint8_t*>(bitmap_buffer.block(0)->release()), bitmap_size};
2328
} else {
2429
return std::nullopt;

cpp/arcticdb/processing/test/test_clause.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,7 @@ void check_column(arcticdb::SegmentInMemory segment, std::string_view column_nam
112112
ASSERT_EQ(dt, column.type().data_type());
113113
for (std::size_t idx = 0u; idx < ugv; ++idx) {
114114
if constexpr (std::is_floating_point_v<T>) {
115-
const T val = column.scalar_at<T>(idx).value();
116-
if (std::isnan(val)) {
117-
ASSERT_TRUE(std::isnan(f(idx)));
118-
} else {
119-
ASSERT_EQ(f(idx), val);
120-
}
115+
ASSERT_EQ(f(idx), column.scalar_at<T>(idx));
121116
} else {
122117
ASSERT_EQ(f(idx), column.scalar_at<T>(idx));
123118
}
@@ -192,17 +187,22 @@ TEST(Clause, AggregationSparseColumn) {
192187
return idx % 2 == 0 ? 450 + 10 * idx : 0;
193188
});
194189
check_column<int64_t>(*segments[0], "min_int", unique_grouping_values, [](size_t idx) -> std::optional<int64_t> {
195-
return idx % 2 == 0 ? std::optional{static_cast<int64_t>(idx)} : std::nullopt;
190+
return idx % 2 == 0 ? std::make_optional<int64_t>(idx) : std::nullopt;
196191
});
197192
check_column<int64_t>(*segments[0], "max_int", unique_grouping_values, [](size_t idx) -> std::optional<int64_t> {
198-
return idx % 2 == 0 ? std::optional{static_cast<int64_t>(90 + idx)} : std::nullopt;
199-
});
200-
check_column<double>(*segments[0], "mean_int", unique_grouping_values, [](size_t idx) -> double {
201-
return idx % 2 == 0 ? 45 + idx : std::numeric_limits<double>::quiet_NaN();
193+
return idx % 2 == 0 ? std::make_optional<int64_t>(90 + idx) : std::nullopt;
202194
});
203-
check_column<uint64_t>(*segments[0], "count_int", unique_grouping_values, [](size_t idx) -> uint64_t {
204-
return idx % 2 == 0 ? 10 : 0;
195+
check_column<double>(*segments[0], "mean_int", unique_grouping_values, [](size_t idx) -> std::optional<double> {
196+
return idx % 2 == 0 ? std::make_optional<double>(45 + idx) : std::nullopt;
205197
});
198+
check_column<uint64_t>(
199+
*segments[0],
200+
"count_int",
201+
unique_grouping_values,
202+
[](size_t idx) -> std::optional<uint64_t> {
203+
return idx % 2 == 0 ? std::make_optional<uint64_t>(10) : std::nullopt;
204+
}
205+
);
206206
}
207207

208208
TEST(Clause, AggregationSparseGroupby) {

cpp/arcticdb/processing/test/test_unsorted_aggregation.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class AggregationResult : public ::testing::TestWithParam<DataType> {
8787
if constexpr (is_bool_type(InputTypeTag::data_type())) {
8888
return std::array{2 / 3.0, 0.0, 1.0, 1 / 3.0};
8989
} else if constexpr (is_empty_type(InputTypeTag::data_type())) {
90-
return std::array{0.0, 0.0, 0.0};
90+
return std::array<double, 0>{};
9191
}
9292
}
9393

@@ -148,7 +148,11 @@ TEST_P(AggregationResult, Mean) {
148148
ASSERT_EQ(result.field(0).type(), make_scalar_type(OutputDataTypeTag::data_type()));
149149
ASSERT_EQ(result.field(0).name(), "output");
150150
const Column& aggregated_column = result.column(0);
151-
ASSERT_EQ(aggregated_column.row_count(), group_count);
151+
if constexpr (!is_empty_type(TypeTag::data_type)) {
152+
ASSERT_EQ(aggregated_column.row_count(), group_count);
153+
} else {
154+
ASSERT_EQ(aggregated_column.row_count(), 0);
155+
}
152156
constexpr static std::array expected = get_expected_result_mean<InputDataTypeTag>();
153157
Column::for_each_enumerated<OutputDataTypeTag>(aggregated_column, [&](const auto& row) {
154158
ASSERT_EQ(row.value(), expected[row.idx()]);

cpp/arcticdb/processing/unsorted_aggregation.cpp

Lines changed: 124 additions & 88 deletions
Large diffs are not rendered by default.

cpp/arcticdb/processing/unsorted_aggregation.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ class MeanAggregatorData : private AggregatorDataBase {
118118
};
119119
std::vector<Fraction> fractions_;
120120
std::optional<DataType> data_type_;
121+
util::BitMagic sparse_map_;
121122
};
122123

123124
class CountAggregatorData : private AggregatorDataBase {
@@ -131,6 +132,7 @@ class CountAggregatorData : private AggregatorDataBase {
131132

132133
private:
133134
std::vector<uint64_t> aggregated_;
135+
util::BitMagic sparse_map_;
134136
};
135137

136138
class FirstAggregatorData : private AggregatorDataBase {
@@ -146,6 +148,7 @@ class FirstAggregatorData : private AggregatorDataBase {
146148
std::optional<DataType> data_type_;
147149

148150
std::unordered_set<size_t> groups_cache_;
151+
util::BitMagic sparse_map_;
149152
};
150153

151154
class LastAggregatorData : private AggregatorDataBase {
@@ -161,6 +164,7 @@ class LastAggregatorData : private AggregatorDataBase {
161164
std::optional<DataType> data_type_;
162165

163166
std::unordered_set<size_t> groups_cache_;
167+
util::BitMagic sparse_map_;
164168
};
165169

166170
template<class AggregatorData>

cpp/arcticdb/version/version_core.cpp

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1470,6 +1470,25 @@ static void check_incompletes_index_ranges_dont_overlap(
14701470
}
14711471
}
14721472

1473+
void init_sparse_dst_column_before_copy(
1474+
Column& dst_column, size_t offset, size_t num_rows, size_t dst_rawtype_size, OutputFormat output_format,
1475+
const std::optional<util::BitSet>& src_sparse_map, const std::optional<Value>& default_value
1476+
) {
1477+
if (output_format != OutputFormat::ARROW || default_value.has_value()) {
1478+
auto total_size = dst_rawtype_size * num_rows;
1479+
auto dst_ptr = dst_column.bytes_at(offset, total_size);
1480+
dst_column.type().visit_tag([&](auto dst_desc_tag) {
1481+
util::initialize<decltype(dst_desc_tag)>(dst_ptr, total_size, default_value);
1482+
});
1483+
} else {
1484+
if (src_sparse_map.has_value()) {
1485+
create_dense_bitmap(offset, src_sparse_map.value(), dst_column, AllocationType::DETACHABLE);
1486+
} else {
1487+
create_dense_bitmap_all_zeros(offset, num_rows, dst_column, AllocationType::DETACHABLE);
1488+
}
1489+
}
1490+
}
1491+
14731492
void copy_frame_data_to_buffer(
14741493
SegmentInMemory& destination, size_t target_index, SegmentInMemory& source, size_t source_index,
14751494
const RowRange& row_range, DecodePathData shared_data, std::any& handler_data, OutputFormat output_format,
@@ -1510,10 +1529,9 @@ void copy_frame_data_to_buffer(
15101529
};
15111530
handler->convert_type(src_column, dst_column, mapping, shared_data, handler_data, source.string_pool_ptr());
15121531
} else if (is_empty_type(src_column.type().data_type())) {
1513-
// TODO: For arrow we want to set validity bitmaps instead of `initialize`ing
1514-
dst_column.type().visit_tag([&](auto dst_desc_tag) {
1515-
util::initialize<decltype(dst_desc_tag)>(dst_ptr, total_size, default_value);
1516-
});
1532+
init_sparse_dst_column_before_copy(
1533+
dst_column, offset, num_rows, dst_rawtype_size, output_format, std::nullopt, default_value
1534+
);
15171535
// Do not use src_column.is_sparse() here, as that misses columns that are dense, but have fewer than num_rows
15181536
// values
15191537
} else if (src_column.opt_sparse_map().has_value() &&
@@ -1524,8 +1542,15 @@ void copy_frame_data_to_buffer(
15241542
using dst_type_info = ScalarTypeInfo<decltype(dst_tag)>;
15251543
typename dst_type_info::RawType* typed_dst_ptr =
15261544
reinterpret_cast<typename dst_type_info::RawType*>(dst_ptr);
1527-
// TODO: For arrow we want to set validity bitmaps instead of `initialize`ing
1528-
util::initialize<typename dst_type_info::TDT>(dst_ptr, num_rows * dst_rawtype_size, default_value);
1545+
init_sparse_dst_column_before_copy(
1546+
dst_column,
1547+
offset,
1548+
num_rows,
1549+
dst_rawtype_size,
1550+
output_format,
1551+
src_column.opt_sparse_map(),
1552+
default_value
1553+
);
15291554
details::visit_type(src_column.type().data_type(), [&](auto src_tag) {
15301555
using src_type_info = ScalarTypeInfo<decltype(src_tag)>;
15311556
Column::for_each_enumerated<typename src_type_info::TDT>(
@@ -1548,8 +1573,15 @@ void copy_frame_data_to_buffer(
15481573
dst_ptr += row_count * sizeof(SourceType);
15491574
}
15501575
} else {
1551-
// TODO: For arrow we want to set validity bitmaps instead of `initialize`ing
1552-
util::initialize<SourceTDT>(dst_ptr, num_rows * dst_rawtype_size, default_value);
1576+
init_sparse_dst_column_before_copy(
1577+
dst_column,
1578+
offset,
1579+
num_rows,
1580+
dst_rawtype_size,
1581+
output_format,
1582+
src_column.opt_sparse_map(),
1583+
default_value
1584+
);
15531585
SourceType* typed_dst_ptr = reinterpret_cast<SourceType*>(dst_ptr);
15541586
Column::for_each_enumerated<SourceTDT>(src_column, [&](const auto& row) {
15551587
typed_dst_ptr[row.idx()] = row.value();
@@ -1580,16 +1612,21 @@ void copy_frame_data_to_buffer(
15801612
// one with float32 dtype and one with dtype:
15811613
// common_type(common_type(uint16, int8), float32) = common_type(int32, float32) = float64
15821614
details::visit_type(dst_column.type().data_type(), [&](auto dest_desc_tag) {
1583-
using dst_type_info = ScalarTypeInfo<decltype(dest_desc_tag)>;
15841615
using DestinationRawType = typename decltype(dest_desc_tag)::DataTypeTag::raw_type;
15851616
auto typed_dst_ptr = reinterpret_cast<DestinationRawType*>(dst_ptr);
15861617
details::visit_type(src_column.type().data_type(), [&](auto src_desc_tag) {
15871618
using source_type_info = ScalarTypeInfo<decltype(src_desc_tag)>;
15881619
if constexpr (std::is_arithmetic_v<typename source_type_info::RawType> &&
15891620
std::is_arithmetic_v<DestinationRawType>) {
15901621
if (src_column.is_sparse()) {
1591-
util::initialize<typename dst_type_info::TDT>(
1592-
dst_ptr, num_rows * dst_rawtype_size, default_value
1622+
init_sparse_dst_column_before_copy(
1623+
dst_column,
1624+
offset,
1625+
num_rows,
1626+
dst_rawtype_size,
1627+
output_format,
1628+
src_column.opt_sparse_map(),
1629+
default_value
15931630
);
15941631
Column::for_each_enumerated<typename source_type_info::TDT>(src_column, [&](const auto& row) {
15951632
typed_dst_ptr[row.idx()] = row.value();

python/tests/unit/arcticdb/version_store/test_append.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,3 +750,15 @@ def test_append_series_with_different_row_range_index_name(lmdb_version_store_dy
750750
# See Monday 9797097831, it would be best to require that index names are always matching. This is the case for
751751
# datetime index because it's a physical column. It's a potentially breaking change.
752752
assert lib.read("sym").data.index.name == "index_name_2"
753+
754+
755+
@pytest.mark.xfail(reason="Wrong normalization metadata update. Monday ref: 10029194063")
756+
def test_append_no_columns(lmdb_version_store_dynamic_schema_v1):
757+
lib = lmdb_version_store_dynamic_schema_v1
758+
to_write = pd.DataFrame({"col": [1, 2, 3]}, index=pd.date_range(pd.Timestamp(2025, 1, 1), periods=3))
759+
to_append = pd.DataFrame({}, index=pd.date_range(pd.Timestamp(2025, 1, 4), periods=3))
760+
lib.write("sym", to_write)
761+
lib.append("sym", to_append)
762+
expected = pd.concat([to_write, to_append])
763+
result = lib.read("sym").data
764+
assert_frame_equal(result, expected)

python/tests/unit/arcticdb/version_store/test_arrow.py

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -757,12 +757,63 @@ def test_aggregation_empty_slices(lmdb_version_store_dynamic_schema_v1):
757757
table = lib.read(sym, query_builder=q).data
758758
# sum_col is correctly filled with 0s instead of nulls
759759
assert pc.count(table.column("sum_col"), mode="only_null").as_py() == 0
760-
# TODO: Fix the TODOs in `CopyToBufferTask` to make num_nulls=5 as expected
761-
# For this test it so happens that one present and one missing value end up in the same bucket.
762-
# Copying then default initializes the missing values instead of setting the validity bitmap.
763-
# assert pc.count(table.column("mean_col"), mode="only_null").as_py() == 5
764-
# assert pc.count(table.column("min_col"), mode="only_null").as_py() == 5
765-
# assert pc.count(table.column("max_col"), mode="only_null").as_py() == 5
766-
# assert pc.count(table.column("count_col"), mode="only_null").as_py() == 5
760+
assert pc.count(table.column("mean_col"), mode="only_null").as_py() == 5
761+
assert pc.count(table.column("min_col"), mode="only_null").as_py() == 5
762+
assert pc.count(table.column("max_col"), mode="only_null").as_py() == 5
763+
assert pc.count(table.column("count_col"), mode="only_null").as_py() == 5
764+
expected = lib.read(sym, query_builder=q, output_format=OutputFormat.PANDAS).data
765+
assert_frame_equal_with_arrow(table, expected)
766+
767+
768+
def test_resample_empty_slices(lmdb_version_store_dynamic_schema_v1):
769+
lib = lmdb_version_store_dynamic_schema_v1
770+
lib.set_output_format(OutputFormat.EXPERIMENTAL_ARROW)
771+
sym = "sym"
772+
773+
def gen_df(start, num_rows, with_columns=True):
774+
data = {}
775+
if with_columns:
776+
data = {
777+
"mean_col": np.arange(start, start + num_rows, dtype=np.float64),
778+
"sum_col": np.arange(start, start + num_rows, dtype=np.float64),
779+
"min_col": np.arange(start, start + num_rows, dtype=np.float64),
780+
"max_col": np.arange(start, start + num_rows, dtype=np.float64),
781+
"count_col": np.arange(start, start + num_rows, dtype=np.float64),
782+
}
783+
index = pd.date_range(pd.Timestamp(2025, 1, start), periods=num_rows)
784+
return pd.DataFrame(data, index=index)
785+
786+
slices = [
787+
gen_df(1, 3),
788+
gen_df(4, 2, False), # We expect an entirely missing slice 4th-5th
789+
gen_df(6, 3),
790+
gen_df(9, 5, False), # We expect two missing slices 10th-11th and 12th-13th
791+
gen_df(14, 2),
792+
gen_df(16, 2, False), # We expect one missing slice 16th-17th
793+
# TODO: If we don't finish with an append with columns our normalization metadata will be broken
794+
gen_df(18, 1),
795+
]
796+
for df_slice in slices:
797+
lib.append(sym, df_slice, write_if_missing=True)
798+
799+
q = QueryBuilder()
800+
q.resample("2d").agg(
801+
{
802+
"mean_col": "mean",
803+
"sum_col": "sum",
804+
"min_col": "min",
805+
"max_col": "max",
806+
"count_col": "count",
807+
}
808+
)
809+
810+
table = lib.read(sym, query_builder=q).data
811+
# sum_col is correctly filled with 0s instead of nulls
812+
assert pc.count(table.column("sum_col"), mode="only_null").as_py() == 0
813+
# We expect 4 entirely empty buckets
814+
assert pc.count(table.column("mean_col"), mode="only_null").as_py() == 4
815+
assert pc.count(table.column("min_col"), mode="only_null").as_py() == 4
816+
assert pc.count(table.column("max_col"), mode="only_null").as_py() == 4
817+
assert pc.count(table.column("count_col"), mode="only_null").as_py() == 4
767818
expected = lib.read(sym, query_builder=q, output_format=OutputFormat.PANDAS).data
768819
assert_frame_equal_with_arrow(table, expected)

0 commit comments

Comments
 (0)