Skip to content

Commit 4f3fdfd

Browse files
committed
Fix resample empty bucket with date range
In some fairly niche cases processing pipeline can produce empty segments: - Resampling with dynamic schema - date range filter with no index values within an intersecting data key (fixed in a previous PR) We previously tried to allocate a 0 sized memory block which was raising assertions failures. This PR just skips allocating the 0 sized memory blocks and adds an arrow test to verify it works.
1 parent ea2c795 commit 4f3fdfd

File tree

2 files changed

+32
-5
lines changed

2 files changed

+32
-5
lines changed

cpp/arcticdb/pipeline/read_frame.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,17 @@ SegmentInMemory allocate_chunked_frame(const std::shared_ptr<PipelineContext>& c
107107
};
108108
auto handlers = TypeHandlerRegistry::instance();
109109

110-
if (row_count > 0) {
111-
for (auto& column : output.columns()) {
112-
auto handler = handlers->get_handler(output_format, column->type());
113-
const auto data_size = data_type_size(column->type(), output_format, DataTypeMode::EXTERNAL);
114-
for (auto block_row_count : block_row_counts) {
110+
for (auto& column : output.columns()) {
111+
auto handler = handlers->get_handler(output_format, column->type());
112+
const auto data_size = data_type_size(column->type(), output_format, DataTypeMode::EXTERNAL);
113+
for (auto block_row_count : block_row_counts) {
114+
if (block_row_count > 0) {
115+
// We can end up with empty segments from the processing pipeline, e.g. when:
116+
// - Filtering a data key to the empty set (e.g. date_range = (3, 3) in a data key with no index=3)
117+
// - Resampling with a date range with a bucket slice containing no indices
118+
// 0 sized memory blocks would break the offset assumptions in chunked buffers, and it is fine to have
119+
// number of memory blocks not equal number of segments because follow-up methods like
120+
// `copy_frame_data_to_buffer` rely on offsets rather than block indices.
115121
const auto bytes = block_row_count * data_size;
116122
column->allocate_data(bytes);
117123
column->advance_data(bytes);

python/tests/unit/arcticdb/version_store/test_arrow.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,3 +904,24 @@ def gen_df(start, num_rows, with_columns=True):
904904
assert pc.count(table.column("count_col"), mode="only_null").as_py() == 4
905905
expected = lib.read(sym, query_builder=q, output_format=OutputFormat.PANDAS).data
906906
assert_frame_equal_with_arrow(table, expected)
907+
908+
909+
def test_resample_row_slice_responsible_for_no_buckets(lmdb_version_store_tiny_segment):
910+
# Closely mimics test_resampling_row_slice_responsible_for_no_buckets with arrow from test_resample.py
911+
# TODO: Remove this test if we enable pipeline tests with arrow
912+
lib = lmdb_version_store_tiny_segment
913+
lib.set_output_format(OutputFormat.EXPERIMENTAL_ARROW)
914+
sym = "sym"
915+
df = pd.DataFrame(
916+
{
917+
"to_sum": [1, 2, 3, 4],
918+
},
919+
index=[pd.Timestamp(0), pd.Timestamp(100), pd.Timestamp(200), pd.Timestamp(3000)],
920+
)
921+
lib.write(sym, df)
922+
923+
q = QueryBuilder().resample("us").agg({"to_sum": ("to_sum", "sum")})
924+
date_range = (pd.Timestamp(0), pd.Timestamp(1500))
925+
table = lib.read(sym, date_range=date_range, query_builder=q).data
926+
expected = pd.DataFrame({"to_sum": [6]}, index=[pd.Timestamp(0)])
927+
assert_frame_equal_with_arrow(table, expected)

0 commit comments

Comments
 (0)