Skip to content

Commit 59551d3

Browse files
authored
Fix range intersection calculation in query.cpp (#2632)
#### Reference Issues/PRs Monday refs: - 9943908001 - 10031061308 #### What does this implement or fix? - Fixes reading of indexes with old format (Monday ref: 9943908001) - Main fix: cpp/arcticdb/pipeline/query.cpp - tests: cpp/arcticdb/pipeline/test/test_query.cpp and python/tests/integration/arcticdb/version_store/test_num_storage_operations.py - Fixes a bug where delete_range was not updating the index correctly (Monday ref: 10031061308) - Main fix: cpp/arcticdb/version/version_core.cpp - Tests: https://github.com/man-group/ArcticDB/pull/2632/files#diff-7a61fb14e9865723e613017a346654e7c2694ffdb5eb13617e1665d1ad1ccba5R784 - Also tries to fix timeouts in macos builds #### Any other comments? #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent 90505d1 commit 59551d3

18 files changed

+251
-155
lines changed

.github/workflows/build_with_conda.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ jobs:
172172
eval "$command"
173173
else
174174
cd python
175-
python -m pytest --timeout=3600 -n logical --dist worksteal -v tests $ARCTICDB_PYTEST_ARGS
175+
python -m pytest --timeout=3600 -v -n logical --dist worksteal tests $ARCTICDB_PYTEST_ARGS
176176
fi
177177
env:
178178
ARCTICDB_USING_CONDA: 1
@@ -281,7 +281,7 @@ jobs:
281281
eval "$command"
282282
else
283283
cd python
284-
python -m pytest --timeout=3600 -n logical --dist worksteal tests $ARCTICDB_PYTEST_ARGS
284+
python -m pytest --timeout=3600 -v -n logical --dist worksteal tests $ARCTICDB_PYTEST_ARGS
285285
fi
286286
env:
287287
ARCTICDB_USING_CONDA: 1

cpp/arcticdb/pipeline/index_utils.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ folly::Future<entity::AtomKey> write_index(
2525
for (const auto& slice_and_key : slice_and_keys) {
2626
writer.add(slice_and_key.key(), slice_and_key.slice_);
2727
}
28+
2829
return writer.commit();
2930
}
3031

cpp/arcticdb/pipeline/index_writer.hpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,18 @@ class IndexWriter {
9797
);
9898

9999
bool new_col_group = !current_col_.has_value() || *current_col_ < slice.col_range.first;
100+
bool missing_row_val = !current_row_.has_value();
101+
bool is_valid_col = *current_col_ == slice.col_range.first;
102+
bool is_valid_row = *current_row_ < slice.row_range.first;
103+
bool is_valid = (is_valid_col && is_valid_row);
100104
util::check_arg(
101-
!current_row_.has_value() || new_col_group ||
102-
(*current_col_ == slice.col_range.first && *current_row_ < slice.row_range.first),
103-
"expected increasing row group, last col range left value {}, arg {}",
105+
missing_row_val || new_col_group || is_valid,
106+
"expected increasing row group, last col range left value {}, col arg {}, row left value {}, row arg "
107+
"{}",
104108
current_col_.value_or(-1),
105-
slice.col_range
109+
slice.col_range,
110+
current_row_.value_or(-1),
111+
slice.row_range
106112
);
107113

108114
add_unchecked(key, slice);

cpp/arcticdb/pipeline/query.cpp

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ template<typename ContainerType, typename IdxType>
3232
std::unique_ptr<util::BitSet> build_bitset_for_index(
3333
const ContainerType& container,
3434
IndexRange rg, // IndexRange is expected to be inclusive on both ends
35-
bool dynamic_schema, bool column_groups, std::unique_ptr<util::BitSet>&& input
35+
bool dynamic_schema, bool column_groups, bool is_read_operation, std::unique_ptr<util::BitSet>&& input
3636
) {
3737
auto res = std::make_unique<util::BitSet>(static_cast<util::BitSetSizeType>(container.size()));
3838
if (container.empty())
@@ -53,7 +53,11 @@ std::unique_ptr<util::BitSet> build_bitset_for_index(
5353

5454
// End index column is exclusive. We want to find the last position where `range_start` is < end_index at
5555
// position. This is equivalent to finding the first position where range_start + 1 >= end_index at position.
56-
auto start_pos = std::lower_bound(end_index_col_begin, end_index_col_end, range_start + 1);
56+
// If we are reading, we want to include the start index, in order to support backwards compatibility with older
57+
// versions. The same fix should be done for updates, but that is not implemented yet and should be added with
58+
// https://github.com/man-group/ArcticDB/issues/2655
59+
const auto adjusted_range_start = is_read_operation ? range_start : range_start + 1;
60+
auto start_pos = std::lower_bound(end_index_col_begin, end_index_col_end, adjusted_range_start);
5761

5862
if (start_pos == end_idx_col.template end<IndexTagType>()) {
5963
ARCTICDB_DEBUG(log::version(), "Returning as start pos is at end");
@@ -92,7 +96,12 @@ std::unique_ptr<util::BitSet> build_bitset_for_index(
9296
const auto range_start = std::get<timestamp>(rg.start_);
9397
const auto range_end = std::get<timestamp>(rg.end_);
9498
for (auto i = 0u; i < container.size(); ++i) {
95-
const auto intersects = range_intersects<RawType>(range_start, range_end, *start_idx_pos, *end_idx_pos - 1);
99+
// If we are reading, we want to include the the end index, in order to support backwards compatibility with
100+
// older versions. The same fix should be done for updates, but that is not implemented yet and should be
101+
// added with https://github.com/man-group/ArcticDB/issues/2655
102+
const auto adjusted_end_idx_pos = is_read_operation ? *end_idx_pos : *end_idx_pos - 1;
103+
const auto intersects =
104+
range_intersects<RawType>(range_start, range_end, *start_idx_pos, adjusted_end_idx_pos);
96105
(*res)[i] = intersects;
97106
if (intersects)
98107
ARCTICDB_DEBUG(log::version(), "range intersects at {}", i);
@@ -113,10 +122,11 @@ std::unique_ptr<util::BitSet> build_bitset_for_index(
113122

114123
template std::unique_ptr<util::BitSet> build_bitset_for_index<
115124
IndexSegmentReader,
116-
TimeseriesIndex>(const index::IndexSegmentReader&, IndexRange, bool, bool, std::unique_ptr<util::BitSet>&&);
125+
TimeseriesIndex>(const index::IndexSegmentReader&, IndexRange, bool, bool, bool, std::unique_ptr<util::BitSet>&&);
117126
template std::unique_ptr<util::BitSet> build_bitset_for_index<
118127
IndexSegmentReader,
119-
TableIndex>(const index::IndexSegmentReader&, IndexRange, bool, bool, std::unique_ptr<util::BitSet>&&);
128+
TableIndex>(const index::IndexSegmentReader&, IndexRange, bool, bool, bool, std::unique_ptr<util::BitSet>&&);
120129
template std::unique_ptr<util::BitSet> build_bitset_for_index<
121-
TestContainer, TimeseriesIndex>(const TestContainer&, IndexRange, bool, bool, std::unique_ptr<util::BitSet>&&);
130+
TestContainer,
131+
TimeseriesIndex>(const TestContainer&, IndexRange, bool, bool, bool, std::unique_ptr<util::BitSet>&&);
122132
} // namespace arcticdb::pipelines

cpp/arcticdb/pipeline/query.hpp

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -209,29 +209,29 @@ bool range_intersects(RawType a_start, RawType a_end, RawType b_start, RawType b
209209

210210
template<typename ContainerType, typename IdxType>
211211
std::unique_ptr<util::BitSet> build_bitset_for_index(
212-
const ContainerType& container, IndexRange rg, bool dynamic_schema, bool column_groups,
212+
const ContainerType& container, IndexRange rg, bool dynamic_schema, bool column_groups, bool is_read_operation,
213213
std::unique_ptr<util::BitSet>&& input
214214
);
215215

216216
template<typename ContainerType>
217217
inline FilterQuery<ContainerType> create_index_filter(
218-
const IndexRange& range, bool dynamic_schema, bool column_groups
218+
const IndexRange& range, bool dynamic_schema, bool column_groups, bool is_read_operation
219219
) {
220220
static_assert(std::is_same_v<ContainerType, index::IndexSegmentReader>);
221-
return [rg = range,
222-
dynamic_schema,
223-
column_groups](const ContainerType& container, std::unique_ptr<util::BitSet>&& input) mutable {
221+
return [rg = range, dynamic_schema, column_groups, is_read_operation](
222+
const ContainerType& container, std::unique_ptr<util::BitSet>&& input
223+
) mutable {
224224
auto maybe_index_type = container.seg().template scalar_at<uint8_t>(0u, int(index::Fields::index_type));
225225
const auto index_type = IndexDescriptor::Type(maybe_index_type.value());
226226
switch (index_type) {
227227
case IndexDescriptorImpl::Type::TIMESTAMP: {
228228
return build_bitset_for_index<ContainerType, stream::TimeseriesIndex>(
229-
container, rg, dynamic_schema, column_groups, std::move(input)
229+
container, rg, dynamic_schema, column_groups, is_read_operation, std::move(input)
230230
);
231231
}
232232
case IndexDescriptorImpl::Type::STRING: {
233233
return build_bitset_for_index<ContainerType, stream::TableIndex>(
234-
container, rg, dynamic_schema, column_groups, std::move(input)
234+
container, rg, dynamic_schema, column_groups, is_read_operation, std::move(input)
235235
);
236236
}
237237
default:
@@ -252,7 +252,8 @@ inline void build_row_read_query_filters(
252252
},
253253
[&](const IndexRange& index_range) {
254254
if (index_range.specified_) {
255-
queries.emplace_back(create_index_filter<ContainerType>(index_range, dynamic_schema, column_groups)
255+
queries.emplace_back(
256+
create_index_filter<ContainerType>(index_range, dynamic_schema, column_groups, true)
256257
);
257258
}
258259
},
@@ -335,25 +336,45 @@ inline std::vector<FilterQuery<ContainerType>> build_update_query_filters(
335336
std::holds_alternative<stream::TimeseriesIndex>(index),
336337
"Cannot partition by time when a rowcount-indexed frame was supplied"
337338
);
338-
queries.emplace_back(
339-
create_index_filter<ContainerType>(IndexRange{index_range}, dynamic_schema, column_groups)
340-
);
339+
queries.emplace_back(create_index_filter<ContainerType>(
340+
IndexRange{index_range}, dynamic_schema, column_groups, false
341+
));
341342
},
342343
[&](const auto&) {
343344
util::variant_match(
344345
index,
345346
[&](const stream::TimeseriesIndex&) {
346347
queries.emplace_back(create_index_filter<ContainerType>(
347-
IndexRange{index_range}, dynamic_schema, column_groups
348+
IndexRange{index_range}, dynamic_schema, column_groups, false
348349
));
349350
},
350-
[&](const stream::RowCountIndex&) {
351-
RowRange row_range{
352-
std::get<NumericId>(index_range.start_), std::get<NumericIndex>(index_range.end_)
353-
};
354-
queries.emplace_back(create_row_filter<ContainerType>(std::move(row_range)));
351+
[&](const IndexRange& index_range) {
352+
util::check(
353+
std::holds_alternative<stream::TimeseriesIndex>(index),
354+
"Cannot partition by time when a rowcount-indexed frame was supplied"
355+
);
356+
queries.emplace_back(create_index_filter<ContainerType>(
357+
IndexRange{index_range}, dynamic_schema, column_groups, false
358+
));
355359
},
356-
[&](const auto&) {}
360+
[&](const auto&) {
361+
util::variant_match(
362+
index,
363+
[&](const stream::TimeseriesIndex&) {
364+
queries.emplace_back(create_index_filter<ContainerType>(
365+
IndexRange{index_range}, dynamic_schema, column_groups, false
366+
));
367+
},
368+
[&](const stream::RowCountIndex&) {
369+
RowRange row_range{
370+
std::get<NumericId>(index_range.start_),
371+
std::get<NumericIndex>(index_range.end_)
372+
};
373+
queries.emplace_back(create_row_filter<ContainerType>(std::move(row_range)));
374+
},
375+
[&](const auto&) {}
376+
);
377+
}
357378
);
358379
}
359380
);

cpp/arcticdb/pipeline/test/test_query.cpp

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,69 +11,89 @@
1111

1212
#include <arcticdb/pipeline/test/test_container.hpp>
1313

14-
TEST(BitsetForIndex, DynamicSchemaStrictlyBefore) {
14+
struct BitsetForIndex : public testing::TestWithParam<bool> {
15+
bool is_read_operation() const { return GetParam(); }
16+
};
17+
18+
TEST_P(BitsetForIndex, DynamicSchemaStrictlyBefore) {
1519
using namespace arcticdb;
1620
using namespace arcticdb::pipelines;
1721
TestContainer container;
1822
container.seg().set_range(3, 4);
1923
container.seg().set_range(5, 7);
2024
IndexRange rg(NumericIndex{0}, NumericIndex{2});
2125
auto bitset = build_bitset_for_index<TestContainer, TimeseriesIndex>(
22-
container, rg, true, false, std::unique_ptr<util::BitSet>{}
26+
container, rg, true, false, is_read_operation(), std::unique_ptr<util::BitSet>{}
2327
);
2428
ASSERT_EQ(bitset->count(), 0);
2529
}
2630

27-
TEST(BitsetForIndex, DynamicSchemaStrictlyAfter) {
31+
TEST_P(BitsetForIndex, DynamicSchemaStrictlyAfter) {
2832
using namespace arcticdb;
2933
using namespace arcticdb::pipelines;
3034
TestContainer container;
3135
container.seg().set_range(0, 2);
3236
container.seg().set_range(3, 4);
3337
IndexRange rg(NumericIndex{5}, NumericIndex{7});
3438
auto bitset = build_bitset_for_index<TestContainer, TimeseriesIndex>(
35-
container, rg, true, false, std::unique_ptr<util::BitSet>{}
39+
container, rg, true, false, is_read_operation(), std::unique_ptr<util::BitSet>{}
3640
);
3741
ASSERT_EQ(bitset->count(), 0);
3842
}
3943

40-
TEST(BitsetForIndex, DynamicSchemaMiddle) {
44+
TEST_P(BitsetForIndex, DynamicSchemaMiddle) {
4145
using namespace arcticdb;
4246
using namespace arcticdb::pipelines;
4347
TestContainer container;
4448
container.seg().set_range(0, 2);
4549
container.seg().set_range(5, 7);
4650
IndexRange rg(NumericIndex{3}, NumericIndex{4});
4751
auto bitset = build_bitset_for_index<TestContainer, TimeseriesIndex>(
48-
container, rg, true, false, std::unique_ptr<util::BitSet>{}
52+
container, rg, true, false, is_read_operation(), std::unique_ptr<util::BitSet>{}
4953
);
5054
ASSERT_EQ(bitset->count(), 0);
5155
}
5256

53-
TEST(BitsetForIndex, DynamicSchemaOverlapBegin) {
57+
TEST_P(BitsetForIndex, DynamicSchemaOverlapBegin) {
5458
using namespace arcticdb;
5559
using namespace arcticdb::pipelines;
5660
TestContainer container;
5761
container.seg().set_range(2, 4);
5862
container.seg().set_range(5, 7);
5963
IndexRange rg(NumericIndex{1}, NumericIndex{3});
6064
auto bitset = build_bitset_for_index<TestContainer, TimeseriesIndex>(
61-
container, rg, true, false, std::unique_ptr<util::BitSet>{}
65+
container, rg, true, false, is_read_operation(), std::unique_ptr<util::BitSet>{}
6266
);
6367
ASSERT_EQ((*bitset)[0], true);
6468
ASSERT_EQ(bitset->count(), 1);
6569
}
6670

67-
TEST(BitsetForIndex, DynamicSchemaOverlapEnd) {
71+
TEST_P(BitsetForIndex, DynamicSchemaOverlapEnd) {
6872
using namespace arcticdb;
6973
using namespace arcticdb::pipelines;
7074
TestContainer container;
7175
container.seg().set_range(2, 4);
7276
container.seg().set_range(5, 7);
7377
IndexRange rg(NumericIndex{6}, NumericIndex{8});
7478
auto bitset = build_bitset_for_index<TestContainer, TimeseriesIndex>(
75-
container, rg, true, false, std::unique_ptr<util::BitSet>{}
79+
container, rg, true, false, is_read_operation(), std::unique_ptr<util::BitSet>{}
7680
);
7781
ASSERT_EQ((*bitset)[1], true);
7882
ASSERT_EQ(bitset->count(), 1);
7983
}
84+
85+
TEST_P(BitsetForIndex, DynamicSchemaMatchEndIndex) {
86+
using namespace arcticdb;
87+
using namespace arcticdb::pipelines;
88+
TestContainer container;
89+
container.seg().set_range(2, 4);
90+
container.seg().set_range(5, 7);
91+
IndexRange rg(NumericIndex{7}, NumericIndex{7});
92+
auto bitset = build_bitset_for_index<TestContainer, TimeseriesIndex>(
93+
container, rg, true, false, is_read_operation(), std::unique_ptr<util::BitSet>{}
94+
);
95+
ASSERT_EQ((*bitset)[1], is_read_operation());
96+
ASSERT_EQ(bitset->count(), is_read_operation() ? 1 : 0);
97+
}
98+
99+
INSTANTIATE_TEST_SUITE_P(BitsetForIndexTests, BitsetForIndex, testing::Values(true, false));

cpp/arcticdb/version/version_core.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,12 @@ VersionedItem delete_range_impl(
325325
std::sort(std::begin(flattened_slice_and_keys), std::end(flattened_slice_and_keys));
326326
auto version_key_fut = util::variant_match(
327327
index,
328-
[&index_segment_reader, &flattened_slice_and_keys, &stream_id, &update_info, &store](auto idx) {
328+
[&index_segment_reader, &flattened_slice_and_keys, &stream_id, &update_info, &store, &row_count](auto idx) {
329329
using IndexType = decltype(idx);
330+
auto tsd = std::make_shared<TimeseriesDescriptor>(index_segment_reader.tsd().clone());
331+
tsd->set_total_rows(row_count);
330332
return pipelines::index::write_index<IndexType>(
331-
index_segment_reader.tsd(),
333+
*tsd,
332334
std::move(flattened_slice_and_keys),
333335
IndexPartialKey{stream_id, update_info.next_version_id_},
334336
store

python/tests/conftest.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1387,6 +1387,21 @@ def lmdb_version_store_static_and_dynamic(request) -> Generator[NativeVersionSto
13871387
yield request.getfixturevalue(request.param)
13881388

13891389

1390+
@pytest.fixture(
1391+
scope="function",
1392+
params=(
1393+
"lmdb_version_store_empty_types_v1",
1394+
"lmdb_version_store_empty_types_dynamic_schema_v1",
1395+
),
1396+
)
1397+
def lmdb_version_store_static_and_dynamic_v1(request) -> Generator[NativeVersionStore, None, None]:
1398+
"""
1399+
Designed to test the Native version store with API both static and dynamic schema
1400+
Uses only lmdb with encoding V1.
1401+
"""
1402+
yield request.getfixturevalue(request.param)
1403+
1404+
13901405
@pytest.fixture(
13911406
scope="function",
13921407
params=(

python/tests/integration/arcticdb/test_read_batch_more.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def test_read_batch_2tables_7reads_different_slices(arctic_library):
147147
assert_frame_equal(df2_0_allfilters, batch[7].data)
148148

149149

150-
@pytest.mark.xfail(reason="ArcticDB#1970")
150+
@pytest.mark.skip(reason="ArcticDB#1970")
151151
@pytest.mark.storage
152152
def test_read_batch_query_with_and(arctic_library):
153153
"""
@@ -175,6 +175,7 @@ def test_read_batch_query_with_and(arctic_library):
175175

176176

177177
@pytest.mark.storage
178+
@pytest.mark.skip(reason="ArcticDB#2004")
178179
def test_read_batch_metadata_on_different_version(arctic_library):
179180
"""
180181
Here we test if read of metadata over several different states of DB with
@@ -388,7 +389,7 @@ def test_read_batch_multiple_wrong_things_at_once(arctic_library):
388389
assert_frame_equal_rebuild_index_first(df, batch[5].data)
389390

390391

391-
@pytest.mark.xfail(reason="ArcticDB#2004")
392+
@pytest.mark.skip(reason="ArcticDB#2004")
392393
@pytest.mark.storage
393394
def test_read_batch_query_and_columns_returned_order(arctic_library):
394395
"""
@@ -413,7 +414,7 @@ def q(q):
413414
assert_frame_equal_rebuild_index_first(df_filtered, batch[0].data)
414415

415416

416-
@pytest.mark.xfail(reason="ArcticDB#2005")
417+
@pytest.mark.skip(reason="ArcticDB#2005")
417418
@pytest.mark.storage
418419
def test_read_batch_query_and_columns_wrong_column_names_passed(arctic_library):
419420
"""

python/tests/integration/arcticdb/test_unicode_strings.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def test_recursive_normalizers_blns(lmdb_version_store):
105105
assert_dicts_of_dfs_equal(dict, vit.data)
106106

107107

108-
@pytest.mark.xfail(reason="These do not roundtrip properly. Monday: 9256783357")
108+
@pytest.mark.skip(reason="These do not roundtrip properly. Monday: 9256783357")
109109
def test_recursive_normalizers_blns_in_keys(lmdb_version_store):
110110
lib = lmdb_version_store
111111
strings = read_strings()

0 commit comments

Comments
 (0)