Skip to content

Commit 3c7a4bb

Browse files
authored
[9898173277] Resolve row range edge cases for arrow (#2660)
#### Reference Issues/PRs Fixes monday ref 9898173277 Fixes monday ref 9943908001 when using `query_builder`'s `date_range`. #### What does this implement or fix? - Small refactor to use abstract common `is_slice_in_row_range` and `is_slice_in_index_range` to be used both in regular reads and in processing pipeline. This resolves the issue fixed in #2632 for query builder date ranges as well. - If requested row/date range is empty, we no longer read any data keys (previously we would read one) - Removes several unused methods from `query.hpp` and `index_range.hpp` (index_range one was quite broken) - Adds arrow tests for: - open ended row/date range filters - empty row/date ranges - negative row ranges #### Any other comments? #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent 64085ab commit 3c7a4bb

File tree

9 files changed

+236
-110
lines changed

9 files changed

+236
-110
lines changed

cpp/arcticdb/entity/index_range.hpp

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ struct IndexRange {
9090
return left.start_ <= right.end_ && left.end_ >= right.start_;
9191
}
9292

93+
friend bool closed_aware_intersects(const IndexRange& left, const IndexRange& right) {
94+
return left.inclusive_start() <= right.inclusive_end() && left.inclusive_end() >= right.inclusive_start();
95+
}
96+
9397
friend bool intersects(const IndexRange& rg, const IndexValue& start, const IndexValue& end) {
9498
if (!rg.specified_)
9599
return true;
@@ -104,24 +108,21 @@ struct IndexRange {
104108
return left.start_ == right.start_ && left.end_ == right.end_;
105109
}
106110

107-
void adjust_open_closed_interval() {
108-
if (std::holds_alternative<NumericIndex>(start_) && !start_closed_) {
109-
auto start = std::get<NumericIndex>(start_);
110-
start_ = NumericIndex(start + 1);
111-
}
112-
113-
if (std::holds_alternative<NumericIndex>(end_) && !end_closed_) {
114-
auto end = std::get<NumericIndex>(end_);
115-
end_ = NumericIndex(end - 1);
116-
}
117-
}
118-
119111
IndexValue inclusive_end() const {
120112
if (std::holds_alternative<NumericIndex>(end_) && !end_closed_) {
121113
return NumericIndex(std::get<NumericIndex>(end_) - 1);
122114
}
123115
return end_;
124116
}
117+
118+
IndexValue inclusive_start() const {
119+
if (std::holds_alternative<NumericIndex>(start_) && !start_closed_) {
120+
return NumericIndex(std::get<NumericIndex>(start_) + 1);
121+
}
122+
return start_;
123+
}
124+
125+
bool empty() const { return inclusive_start() > inclusive_end(); }
125126
};
126127

127128
inline IndexRange unspecified_range() { return {}; }

cpp/arcticdb/pipeline/frame_slice.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ struct AxisRange : std::pair<size_t, size_t> {
3030

3131
[[nodiscard]] size_t end() const { return second; }
3232

33+
[[nodiscard]] bool empty() const { return first >= second; }
34+
3335
struct Hasher {
3436
template<class T>
3537
std::enable_if_t<std::is_base_of_v<AxisRange, std::decay_t<T>>, std::size_t> operator()(const T& r) const {
@@ -312,4 +314,17 @@ struct formatter<arcticdb::pipelines::SliceAndKey> {
312314
}
313315
};
314316

317+
template<>
318+
struct formatter<arcticdb::pipelines::RangesAndKey> {
319+
template<typename ParseContext>
320+
constexpr auto parse(ParseContext& ctx) {
321+
return ctx.begin();
322+
}
323+
324+
template<typename FormatContext>
325+
auto format(arcticdb::pipelines::RangesAndKey sk, FormatContext& ctx) const {
326+
return fmt::format_to(ctx.out(), "{},{},{},{}", sk.row_range(), sk.col_range(), sk.key_, sk.is_incomplete());
327+
}
328+
};
329+
315330
} // namespace fmt

cpp/arcticdb/pipeline/query.cpp

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,31 @@ namespace arcticdb::pipelines {
1616
using namespace arcticdb::stream;
1717
using namespace arcticdb::pipelines::index;
1818

19-
IndexValue start_index(const std::vector<SliceAndKey>& sk, std::size_t row) { return sk[row].key().start_index(); }
20-
21-
IndexValue start_index(const index::IndexSegmentReader& isr, std::size_t row) {
22-
return index::index_value_from_segment(isr.seg(), row, index::Fields::start_index);
19+
RowRange slice_row_range_at(const IndexSegmentReader& isr, std::size_t row) {
20+
auto start_row = isr.column(index::Fields::start_row).scalar_at<std::size_t>(row).value();
21+
auto end_row = isr.column(index::Fields::end_row).scalar_at<std::size_t>(row).value();
22+
return {start_row, end_row};
2323
}
2424

25-
IndexValue end_index(const index::IndexSegmentReader& isr, std::size_t row) {
26-
return index::index_value_from_segment(isr.seg(), row, index::Fields::end_index);
25+
RowRange slice_row_range_at(const std::vector<SliceAndKey>& sk, std::size_t row) { return sk[row].slice_.row_range; }
26+
27+
bool is_slice_in_row_range(const RowRange& slice_row_range, const RowRange& row_filter) {
28+
// If the row_filter is empty we return false (i.e. don't read the slice).
29+
// This is required for cases like a range (3, 2) which falls completely within an index row (0, 10). We
30+
// know that if the range is empty we don't need to read the data key for (0, 10) because it won't contain
31+
// any elements within the empty range.
32+
return slice_row_range.first < row_filter.second && slice_row_range.second > row_filter.first &&
33+
!row_filter.empty();
2734
}
2835

29-
IndexValue end_index(const std::vector<SliceAndKey>& sk, std::size_t row) { return sk[row].key().end_index(); }
36+
bool is_slice_in_index_range(IndexRange slice_index_range, const IndexRange& index_filter, bool is_read_operation) {
37+
// Typically slice_index_range should be end exclusive, however due to old bugs we have old data written with
38+
// inclusive end_index. So, when we are reading we explicitly set the interval as closed to be able to read
39+
// old_data. The same fix should be done for updates, but that is not implemented yet and should be added with
40+
// https://github.com/man-group/ArcticDB/issues/2655
41+
slice_index_range.end_closed_ = is_read_operation;
42+
return closed_aware_intersects(slice_index_range, index_filter) && !index_filter.empty();
43+
}
3044

3145
template<typename ContainerType, typename IdxType>
3246
std::unique_ptr<util::BitSet> build_bitset_for_index(
@@ -92,16 +106,9 @@ std::unique_ptr<util::BitSet> build_bitset_for_index(
92106
auto start_idx_pos = start_idx_col.template begin<IndexTagType>();
93107
auto end_idx_pos = end_idx_col.template begin<IndexTagType>();
94108

95-
using RawType = typename IndexTagType::DataTypeTag::raw_type;
96-
const auto range_start = std::get<timestamp>(rg.start_);
97-
const auto range_end = std::get<timestamp>(rg.end_);
98109
for (auto i = 0u; i < container.size(); ++i) {
99-
// If we are reading, we want to include the the end index, in order to support backwards compatibility with
100-
// older versions. The same fix should be done for updates, but that is not implemented yet and should be
101-
// added with https://github.com/man-group/ArcticDB/issues/2655
102-
const auto adjusted_end_idx_pos = is_read_operation ? *end_idx_pos : *end_idx_pos - 1;
103110
const auto intersects =
104-
range_intersects<RawType>(range_start, range_end, *start_idx_pos, adjusted_end_idx_pos);
111+
is_slice_in_index_range(IndexRange(*start_idx_pos, *end_idx_pos), rg, is_read_operation);
105112
(*res)[i] = intersects;
106113
if (intersects)
107114
ARCTICDB_DEBUG(log::version(), "range intersects at {}", i);

cpp/arcticdb/pipeline/query.hpp

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -160,28 +160,15 @@ inline FilterQuery<index::IndexSegmentReader> create_dynamic_col_filter(
160160
};
161161
}
162162

163-
inline std::size_t start_row(const index::IndexSegmentReader& isr, std::size_t row) {
164-
return isr.column(index::Fields::start_row).scalar_at<std::size_t>(row).value();
165-
}
166-
167-
inline std::size_t start_row(const std::vector<SliceAndKey>& sk, std::size_t row) {
168-
return sk[row].slice_.row_range.first;
169-
}
170-
171-
inline std::size_t end_row(const index::IndexSegmentReader& isr, std::size_t row) {
172-
return isr.column(index::Fields::end_row).scalar_at<std::size_t>(row).value();
173-
}
174-
175-
inline std::size_t end_row(const std::vector<SliceAndKey>& sk, std::size_t row) {
176-
return sk[row].slice_.row_range.second;
177-
}
163+
RowRange slice_row_range_at(const std::vector<SliceAndKey>& sk, std::size_t row);
164+
RowRange slice_row_range_at(const index::IndexSegmentReader& isr, std::size_t row);
178165

179166
template<typename ContainerType>
180167
inline FilterQuery<ContainerType> create_row_filter(RowRange&& range) {
181168
return [rg = std::move(range)](const ContainerType& container, std::unique_ptr<util::BitSet>&& input) mutable {
182169
auto res = std::make_unique<util::BitSet>(static_cast<util::BitSetSizeType>(container.size()));
183170
for (std::size_t r = 0, end = container.size(); r < end; ++r) {
184-
bool included = start_row(container, r) < rg.second && end_row(container, r) > rg.first;
171+
bool included = is_slice_in_row_range(slice_row_range_at(container, r), rg);
185172
ARCTICDB_DEBUG(log::version(), "Row {} is {} range {}", r, included ? "inside" : "outside", rg);
186173
(*res)[r] = included;
187174
}
@@ -194,18 +181,8 @@ inline FilterQuery<ContainerType> create_row_filter(RowRange&& range) {
194181
};
195182
}
196183

197-
IndexValue start_index(const std::vector<SliceAndKey>& sk, std::size_t row);
198-
199-
IndexValue start_index(const index::IndexSegmentReader& isr, std::size_t row);
200-
201-
IndexValue end_index(const index::IndexSegmentReader& isr, std::size_t row);
202-
203-
IndexValue end_index(const std::vector<SliceAndKey>& sk, std::size_t row);
204-
205-
template<typename RawType>
206-
bool range_intersects(RawType a_start, RawType a_end, RawType b_start, RawType b_end) {
207-
return a_start <= b_end && a_end >= b_start;
208-
}
184+
bool is_slice_in_row_range(const RowRange& slice_row_range, const RowRange& row_filter);
185+
bool is_slice_in_index_range(IndexRange slice_index_range, const IndexRange& index_filter, bool is_read_operation);
209186

210187
template<typename ContainerType, typename IdxType>
211188
std::unique_ptr<util::BitSet> build_bitset_for_index(

cpp/arcticdb/pipeline/read_frame.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,15 @@ SegmentInMemory allocate_chunked_frame(const std::shared_ptr<PipelineContext>& c
107107
};
108108
auto handlers = TypeHandlerRegistry::instance();
109109

110-
for (auto& column : output.columns()) {
111-
auto handler = handlers->get_handler(output_format, column->type());
112-
const auto data_size = data_type_size(column->type(), output_format, DataTypeMode::EXTERNAL);
113-
for (auto block_row_count : block_row_counts) {
114-
const auto bytes = block_row_count * data_size;
115-
column->allocate_data(bytes);
116-
column->advance_data(bytes);
110+
if (row_count > 0) {
111+
for (auto& column : output.columns()) {
112+
auto handler = handlers->get_handler(output_format, column->type());
113+
const auto data_size = data_type_size(column->type(), output_format, DataTypeMode::EXTERNAL);
114+
for (auto block_row_count : block_row_counts) {
115+
const auto bytes = block_row_count * data_size;
116+
column->allocate_data(bytes);
117+
column->advance_data(bytes);
118+
}
117119
}
118120
}
119121

cpp/arcticdb/processing/clause.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <arcticdb/processing/clause.hpp>
1818
#include <arcticdb/pipeline/column_stats.hpp>
1919
#include <arcticdb/pipeline/frame_slice.hpp>
20+
#include <arcticdb/pipeline/query.hpp>
2021
#include <arcticdb/util/test/random_throw.hpp>
2122
#include <ankerl/unordered_dense.h>
2223
#include <arcticdb/util/movable_priority_queue.hpp>
@@ -1376,12 +1377,13 @@ std::vector<EntityId> ColumnStatsGenerationClause::process(std::vector<EntityId>
13761377
}
13771378

13781379
std::vector<std::vector<size_t>> RowRangeClause::structure_for_processing(std::vector<RangesAndKey>& ranges_and_keys) {
1380+
auto row_range_filter = RowRange{start_, end_};
13791381
ranges_and_keys.erase(
13801382
std::remove_if(
13811383
ranges_and_keys.begin(),
13821384
ranges_and_keys.end(),
1383-
[this](const RangesAndKey& ranges_and_key) {
1384-
return ranges_and_key.row_range_.start() >= end_ || ranges_and_key.row_range_.end() <= start_;
1385+
[&](const RangesAndKey& ranges_and_key) {
1386+
return !is_slice_in_row_range(ranges_and_key.row_range(), row_range_filter);
13851387
}
13861388
),
13871389
ranges_and_keys.end()
@@ -1532,13 +1534,14 @@ std::vector<std::vector<size_t>> DateRangeClause::structure_for_processing(std::
15321534
processing_config_.index_type_ == IndexDescriptor::Type::TIMESTAMP,
15331535
"Cannot use date range with non-timestamp indexed data"
15341536
);
1537+
auto index_filter = IndexRange(start_, end_);
15351538
ranges_and_keys.erase(
15361539
std::remove_if(
15371540
ranges_and_keys.begin(),
15381541
ranges_and_keys.end(),
1539-
[this](const RangesAndKey& ranges_and_key) {
1540-
auto [start_index, end_index] = ranges_and_key.key_.time_range();
1541-
return start_index > end_ || end_index <= start_;
1542+
[&](const RangesAndKey& ranges_and_key) {
1543+
auto slice_index_range = IndexRange(ranges_and_key.key_.time_range());
1544+
return !is_slice_in_index_range(slice_index_range, index_filter, true);
15421545
}
15431546
),
15441547
ranges_and_keys.end()

python/tests/compat/arcticdb/test_compatibility.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -434,11 +434,14 @@ def test_compat_update_old_updated_data(pandas_v1_venv, s3_ssl_disabled_storage,
434434
(None, pd.Timestamp("2025-01-03 10:00:00")), # Intersects with problematic range at end
435435
],
436436
)
437-
def test_compat_arrow_range_old_updated_data(pandas_v1_venv, s3_ssl_disabled_storage, lib_name, date_range):
437+
@pytest.mark.parametrize("use_query_builder", [True, False])
438+
def test_compat_date_range_old_updated_data(
439+
pandas_v1_venv, s3_ssl_disabled_storage, lib_name, date_range, use_query_builder, any_output_format
440+
):
438441
# There was a bug where data written using update and old versions of ArcticDB produced data keys where the
439442
# end_index value was not 1 nanosecond larger than the last index value in the segment (as it should be), but
440443
# instead contained the start of the date_range passed into the update call.
441-
# We want to verify C++ truncation within arrow works with the old broken end index values.
444+
# We want to verify reading date range of the old broken end index values works.
442445
arctic_uri = s3_ssl_disabled_storage.arctic_uri
443446
with CompatLibrary(pandas_v1_venv, arctic_uri, lib_name) as compat:
444447
sym = "sym"
@@ -456,6 +459,11 @@ def test_compat_arrow_range_old_updated_data(pandas_v1_venv, s3_ssl_disabled_sto
456459
compat.old_lib.update(sym, df_1, '(pd.Timestamp("2025-01-03 00:00:00"), None)')
457460
compat.old_lib.update(sym, df_2, '(pd.Timestamp("2025-01-04 00:00:00"), None)')
458461

462+
expected_df = pd.concat([df_0.iloc[:1], df_1.iloc[:1], df_2])
463+
filter_after_start = expected_df.index >= date_range[0] if date_range[0] else True
464+
filter_before_end = expected_df.index <= date_range[1] if date_range[1] else True
465+
expected_df = expected_df[filter_after_start & filter_before_end]
466+
459467
# Resample using current version
460468
with compat.current_version() as curr:
461469
index_df = curr.lib._nvs.read_index(sym)
@@ -465,9 +473,12 @@ def test_compat_arrow_range_old_updated_data(pandas_v1_venv, s3_ssl_disabled_sto
465473
assert index_df["end_index"].iloc[1] == pd.Timestamp("2025-01-04 00:00:00")
466474
assert index_df["end_index"].iloc[2] == pd.Timestamp("2025-01-05 23:00:00") + pd.Timedelta(1, unit="ns")
467475

468-
arrow_table = curr.lib.read(sym, date_range=date_range, output_format=OutputFormat.EXPERIMENTAL_ARROW).data
469-
expected_df = curr.lib.read(sym, date_range=date_range).data
470-
assert_frame_equal_with_arrow(arrow_table, expected_df)
476+
if use_query_builder:
477+
q = QueryBuilder().date_range(date_range)
478+
result = curr.lib.read(sym, query_builder=q, output_format=any_output_format).data
479+
else:
480+
result = curr.lib.read(sym, date_range=date_range, output_format=any_output_format).data
481+
assert_frame_equal_with_arrow(result, expected_df)
471482

472483

473484
def test_norm_meta_column_and_index_names_write_old_read_new(old_venv_and_arctic_uri, lib_name):

python/tests/integration/arcticdb/version_store/test_num_storage_operations.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
config_context,
1313
config_context_string,
1414
)
15+
from arcticdb.version_store.processing import QueryBuilder
1516
import arcticdb.toolbox.query_stats as qs
1617

1718

@@ -281,6 +282,9 @@ def get_update_dataframe(start_index, end_index):
281282

282283

283284
def get_num_data_keys_intersecting_row_range(index, start, end):
285+
if end <= start:
286+
# We shouldn't read any data keys if row range is empty
287+
return 0
284288
count = 0
285289
for index, row in index.iterrows():
286290
if (start is None or start < row["end_row"]) and (end is None or end > row["start_row"]):
@@ -289,6 +293,9 @@ def get_num_data_keys_intersecting_row_range(index, start, end):
289293

290294

291295
def get_num_data_keys_intersecting_date_range(index, start, end, exclude_fully_included=False):
296+
if start is not None and end is not None and end < start:
297+
# We shouldn't read any data keys if date range is empty
298+
return 0
292299
count = 0
293300
for i, (_, row) in enumerate(index.reset_index().iterrows()):
294301
# end is inclusive when doing date_range but end_index in the column is exclusive
@@ -325,8 +332,11 @@ def get_num_data_keys_intersecting_date_range(index, start, end, exclude_fully_i
325332
@pytest.mark.parametrize(
326333
"row_range_start, row_range_end", [(0, 0), (5, 5), (0, 1), (1, 2), (5, 6), (0, 4), (1, 5), (0, 6), (6, 15), (0, 15)]
327334
)
335+
@pytest.mark.parametrize("use_query_builder", [True, False])
328336
@pytest.mark.parametrize("dynamic_schema", [True, False])
329-
def test_row_range_num_reads(s3_store_factory, clear_query_stats, dynamic_schema, row_range_start, row_range_end):
337+
def test_row_range_num_reads(
338+
s3_store_factory, clear_query_stats, dynamic_schema, row_range_start, row_range_end, use_query_builder
339+
):
330340
with config_context("VersionMap.ReloadInterval", 0):
331341
lib = s3_store_factory(column_group_size=2, segment_row_size=2, dynamic_schema=dynamic_schema)
332342
qs.enable()
@@ -343,7 +353,12 @@ def test_row_range_num_reads(s3_store_factory, clear_query_stats, dynamic_schema
343353
assert sum_operations_by_type(stats, "S3_GetObject") == 2
344354

345355
expected_df = df.iloc[row_range_start:row_range_end]
346-
result_df = lib.read(sym, row_range=(row_range_start, row_range_end)).data
356+
row_range = (row_range_start, row_range_end)
357+
if use_query_builder:
358+
q = QueryBuilder().row_range(row_range)
359+
result_df = lib.read(sym, query_builder=q).data
360+
else:
361+
result_df = lib.read(sym, row_range=row_range).data
347362
stats = qs.get_query_stats()
348363
qs.reset_stats()
349364
assert_frame_equal(result_df, expected_df)
@@ -353,8 +368,11 @@ def test_row_range_num_reads(s3_store_factory, clear_query_stats, dynamic_schema
353368

354369

355370
@pytest.mark.parametrize("date_range_start, date_range_end", date_ranges_to_test)
371+
@pytest.mark.parametrize("use_query_builder", [True, False])
356372
@pytest.mark.parametrize("dynamic_schema", [True, False])
357-
def test_date_range_num_reads(s3_store_factory, clear_query_stats, dynamic_schema, date_range_start, date_range_end):
373+
def test_date_range_num_reads(
374+
s3_store_factory, clear_query_stats, dynamic_schema, date_range_start, date_range_end, use_query_builder
375+
):
358376
with config_context("VersionMap.ReloadInterval", 0):
359377
lib = s3_store_factory(column_group_size=2, segment_row_size=2, dynamic_schema=dynamic_schema)
360378
qs.enable()
@@ -373,7 +391,12 @@ def test_date_range_num_reads(s3_store_factory, clear_query_stats, dynamic_schem
373391
assert sum_operations_by_type(stats, "S3_GetObject") == 2
374392

375393
expected_df = df.loc[date_range_start:date_range_end]
376-
result_df = lib.read(sym, date_range=(date_range_start, date_range_end)).data
394+
date_range = (date_range_start, date_range_end)
395+
if use_query_builder:
396+
q = QueryBuilder().date_range(date_range)
397+
result_df = lib.read(sym, query_builder=q).data
398+
else:
399+
result_df = lib.read(sym, date_range=date_range).data
377400
stats = qs.get_query_stats()
378401
qs.reset_stats()
379402
assert_frame_equal(result_df, expected_df)

0 commit comments

Comments
 (0)