Skip to content

Commit ca244a4

Browse files
authored
[fix](cache) fix concurrent read-write issue on shared roaring bitmap in inverted index (#59253) (#59411)
#59253
1 parent d8f7adc commit ca244a4

File tree

2 files changed

+135
-17
lines changed

2 files changed

+135
-17
lines changed

be/src/vec/functions/function_ip.h

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -713,14 +713,14 @@ class FunctionIsIPAddressInRange : public IFunction {
713713
// >= min ip
714714
RETURN_IF_ERROR(segment_v2::InvertedIndexQueryParamFactory::create_query_value(
715715
param_type, &min_ip, query_param));
716-
segment_v2::InvertedIndexParam res_param;
717-
res_param.column_name = data_type_with_name.first;
718-
res_param.column_type = data_type_with_name.second;
719-
res_param.query_type = segment_v2::InvertedIndexQueryType::GREATER_EQUAL_QUERY;
720-
res_param.query_value = query_param->get_value();
721-
res_param.num_rows = num_rows;
722-
res_param.roaring = std::make_shared<roaring::Roaring>();
723-
RETURN_IF_ERROR(iter->read_from_index(&res_param));
716+
segment_v2::InvertedIndexParam min_param;
717+
min_param.column_name = data_type_with_name.first;
718+
min_param.column_type = data_type_with_name.second;
719+
min_param.query_type = segment_v2::InvertedIndexQueryType::GREATER_EQUAL_QUERY;
720+
min_param.query_value = query_param->get_value();
721+
min_param.num_rows = num_rows;
722+
min_param.roaring = std::make_shared<roaring::Roaring>();
723+
RETURN_IF_ERROR(iter->read_from_index(&min_param));
724724

725725
// <= max ip
726726
RETURN_IF_ERROR(segment_v2::InvertedIndexQueryParamFactory::create_query_value(
@@ -734,21 +734,18 @@ class FunctionIsIPAddressInRange : public IFunction {
734734
max_param.roaring = std::make_shared<roaring::Roaring>();
735735
RETURN_IF_ERROR(iter->read_from_index(&max_param));
736736

737+
auto result_roaring = std::make_shared<roaring::Roaring>();
738+
*result_roaring = *min_param.roaring & *max_param.roaring;
739+
737740
DBUG_EXECUTE_IF("ip.inverted_index_filtered", {
738741
auto req_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
739742
"ip.inverted_index_filtered", "req_id", 0);
740743
LOG(INFO) << "execute inverted index req_id: " << req_id
741-
<< " min: " << res_param.roaring->cardinality();
742-
});
743-
*res_param.roaring &= *max_param.roaring;
744-
DBUG_EXECUTE_IF("ip.inverted_index_filtered", {
745-
auto req_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
746-
"ip.inverted_index_filtered", "req_id", 0);
747-
LOG(INFO) << "execute inverted index req_id: " << req_id
744+
<< " min: " << min_param.roaring->cardinality()
748745
<< " max: " << max_param.roaring->cardinality()
749-
<< " result: " << res_param.roaring->cardinality();
746+
<< " result: " << result_roaring->cardinality();
750747
});
751-
segment_v2::InvertedIndexResultBitmap result(res_param.roaring, null_bitmap);
748+
segment_v2::InvertedIndexResultBitmap result(result_roaring, null_bitmap);
752749
bitmap_result = result;
753750
bitmap_result.mask_out_null();
754751
return Status::OK();

be/test/vec/function/function_ip_test.cpp

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#include "vec/functions/function_ip.h"
19+
1820
#include "function_test_util.h"
1921
#include "gtest/gtest_pred_impl.h"
22+
#include "olap/rowset/segment_v2/index_iterator.h"
23+
#include "olap/rowset/segment_v2/inverted_index_reader.h"
24+
#include "vec/columns/column_const.h"
2025
#include "vec/core/types.h"
2126
#include "vec/data_types/data_type_ipv6.h"
2227
#include "vec/data_types/data_type_number.h"
@@ -158,4 +163,120 @@ TEST(FunctionIpTest, FunctionCutIPv6Test) {
158163
static_cast<void>(check_function<DataTypeString, true>(func_name, input_types, data_set));
159164
}
160165

166+
class MockIndexReader : public segment_v2::InvertedIndexReader {
167+
public:
168+
MockIndexReader(const TabletIndex& index_meta)
169+
: segment_v2::InvertedIndexReader(&index_meta, nullptr) {}
170+
~MockIndexReader() override = default;
171+
segment_v2::InvertedIndexReaderType type() override {
172+
return segment_v2::InvertedIndexReaderType::BKD;
173+
}
174+
Status query(const segment_v2::IndexQueryContextPtr& context, const std::string& column_name,
175+
const void* query_value, segment_v2::InvertedIndexQueryType query_type,
176+
std::shared_ptr<roaring::Roaring>& bit_map) override {
177+
return Status::OK();
178+
}
179+
Status try_query(const segment_v2::IndexQueryContextPtr& context,
180+
const std::string& column_name, const void* query_value,
181+
segment_v2::InvertedIndexQueryType query_type, size_t* count) override {
182+
return Status::OK();
183+
}
184+
Status new_iterator(std::unique_ptr<segment_v2::IndexIterator>* iterator) override {
185+
return Status::OK();
186+
}
187+
};
188+
189+
class MockIndexIterator : public segment_v2::IndexIterator {
190+
public:
191+
MockIndexIterator(std::shared_ptr<MockIndexReader> reader) : _reader(reader) {}
192+
~MockIndexIterator() override = default;
193+
segment_v2::IndexReaderPtr get_reader(segment_v2::IndexReaderType reader_type) const override {
194+
if (std::holds_alternative<segment_v2::InvertedIndexReaderType>(reader_type)) {
195+
if (std::get<segment_v2::InvertedIndexReaderType>(reader_type) ==
196+
segment_v2::InvertedIndexReaderType::BKD) {
197+
return _reader;
198+
}
199+
}
200+
return nullptr;
201+
}
202+
Status read_from_index(const segment_v2::IndexParam& param) override {
203+
auto* p = std::get<segment_v2::InvertedIndexParam*>(param);
204+
if (p->query_type == segment_v2::InvertedIndexQueryType::GREATER_EQUAL_QUERY) {
205+
p->roaring->addRange(10, 20);
206+
} else if (p->query_type == segment_v2::InvertedIndexQueryType::LESS_EQUAL_QUERY) {
207+
p->roaring->addRange(15, 25);
208+
}
209+
return Status::OK();
210+
}
211+
Status read_null_bitmap(segment_v2::InvertedIndexQueryCacheHandle* cache_handle) override {
212+
return Status::OK();
213+
}
214+
Result<bool> has_null() override { return false; }
215+
216+
private:
217+
std::shared_ptr<MockIndexReader> _reader;
218+
};
219+
220+
TEST(FunctionIpTest, evaluate_inverted_index) {
221+
FunctionIsIPAddressInRange func;
222+
223+
// IPv4 test
224+
{
225+
auto cidr_col = ColumnString::create();
226+
cidr_col->insert_data("127.0.0.0/8", 11);
227+
auto const_cidr_col = ColumnConst::create(std::move(cidr_col), 1);
228+
229+
ColumnsWithTypeAndName arguments = {
230+
{std::move(const_cidr_col), std::make_shared<DataTypeString>(), "cidr"}};
231+
232+
std::vector<IndexFieldNameAndTypePair> data_type_with_names = {
233+
{"ip_addr", std::make_shared<DataTypeIPv4>()}};
234+
235+
TabletIndex index_meta;
236+
auto reader = std::make_shared<MockIndexReader>(index_meta);
237+
auto iter = std::make_unique<MockIndexIterator>(reader);
238+
std::vector<segment_v2::IndexIterator*> iterators = {iter.get()};
239+
240+
segment_v2::InvertedIndexResultBitmap bitmap_result;
241+
auto status = func.evaluate_inverted_index(arguments, data_type_with_names, iterators, 100,
242+
bitmap_result);
243+
ASSERT_TRUE(status.ok());
244+
245+
// min_param: [10, 20), max_param: [15, 25)
246+
// intersection: [15, 20) -> 15, 16, 17, 18, 19
247+
ASSERT_EQ(bitmap_result.get_data_bitmap()->cardinality(), 5);
248+
for (int i = 15; i < 20; ++i) {
249+
ASSERT_TRUE(bitmap_result.get_data_bitmap()->contains(i));
250+
}
251+
}
252+
253+
// IPv6 test
254+
{
255+
auto cidr_col = ColumnString::create();
256+
cidr_col->insert_data("ffff::/16", 9);
257+
auto const_cidr_col = ColumnConst::create(std::move(cidr_col), 1);
258+
259+
ColumnsWithTypeAndName arguments = {
260+
{std::move(const_cidr_col), std::make_shared<DataTypeString>(), "cidr"}};
261+
262+
std::vector<IndexFieldNameAndTypePair> data_type_with_names = {
263+
{"ip_addr", std::make_shared<DataTypeIPv6>()}};
264+
265+
TabletIndex index_meta;
266+
auto reader = std::make_shared<MockIndexReader>(index_meta);
267+
auto iter = std::make_unique<MockIndexIterator>(reader);
268+
std::vector<segment_v2::IndexIterator*> iterators = {iter.get()};
269+
270+
segment_v2::InvertedIndexResultBitmap bitmap_result;
271+
auto status = func.evaluate_inverted_index(arguments, data_type_with_names, iterators, 100,
272+
bitmap_result);
273+
ASSERT_TRUE(status.ok());
274+
275+
ASSERT_EQ(bitmap_result.get_data_bitmap()->cardinality(), 5);
276+
for (int i = 15; i < 20; ++i) {
277+
ASSERT_TRUE(bitmap_result.get_data_bitmap()->contains(i));
278+
}
279+
}
280+
}
281+
161282
} // namespace doris::vectorized

0 commit comments

Comments
 (0)