Skip to content

Commit d9a931a

Browse files
Mryangemorningman
authored andcommitted
pick 58636
1 parent e3aa550 commit d9a931a

File tree

10 files changed

+495
-157
lines changed

10 files changed

+495
-157
lines changed

be/src/common/config.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1037,7 +1037,7 @@ DEFINE_mInt64(big_column_size_buffer, "65535");
10371037
DEFINE_mInt64(small_column_size_buffer, "100");
10381038

10391039
// Perform the always_true check at intervals determined by runtime_filter_sampling_frequency
1040-
DEFINE_mInt32(runtime_filter_sampling_frequency, "64");
1040+
DEFINE_mInt32(runtime_filter_sampling_frequency, "32");
10411041
DEFINE_mInt32(execution_max_rpc_timeout_sec, "3600");
10421042
DEFINE_mBool(execution_ignore_eovercrowded, "true");
10431043
// cooldown task configs

be/src/olap/column_predicate.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "olap/rowset/segment_v2/bloom_filter.h"
2626
#include "olap/rowset/segment_v2/inverted_index_iterator.h"
2727
#include "runtime/define_primitive_type.h"
28+
#include "runtime_filter/runtime_filter_selectivity.h"
2829
#include "util/defer_op.h"
2930
#include "util/runtime_profile.h"
3031
#include "vec/columns/column.h"
@@ -372,8 +373,8 @@ class ColumnPredicate {
372373
if (!_always_true) {
373374
_judge_filter_rows += filter_rows;
374375
_judge_input_rows += input_rows;
375-
vectorized::VRuntimeFilterWrapper::judge_selectivity(
376-
get_ignore_threshold(), _judge_filter_rows, _judge_input_rows, _always_true);
376+
RuntimeFilterSelectivity::judge_selectivity(get_ignore_threshold(), _judge_filter_rows,
377+
_judge_input_rows, _always_true);
377378
}
378379
}
379380

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <cstdint>
21+
22+
#include "common/config.h"
23+
#include "common/logging.h"
24+
25+
namespace doris {
26+
27+
// Used to track the selectivity of runtime filters
28+
// If the selectivity of a runtime filter is very low, it is considered ineffective and can be ignored
29+
// Considering that the selectivity of runtime filters may change with data variations
30+
// A dynamic selectivity tracking mechanism is needed
31+
// Note: this is not a thread-safe class
32+
33+
class RuntimeFilterSelectivity {
34+
public:
35+
RuntimeFilterSelectivity() = default;
36+
37+
RuntimeFilterSelectivity(const RuntimeFilterSelectivity&) = delete;
38+
void update_judge_counter() {
39+
if ((_judge_counter++) >= config::runtime_filter_sampling_frequency) {
40+
reset_judge_selectivity();
41+
}
42+
}
43+
44+
void update_judge_selectivity(int filter_id, uint64_t filter_rows, uint64_t input_rows,
45+
double ignore_thredhold) {
46+
if (!_always_true) {
47+
_judge_filter_rows += filter_rows;
48+
_judge_input_rows += input_rows;
49+
judge_selectivity(ignore_thredhold, _judge_filter_rows, _judge_input_rows,
50+
_always_true);
51+
}
52+
53+
VLOG_ROW << fmt::format(
54+
"Runtime filter[{}] selectivity update: filter_rows: {}, input_rows: {}, filter "
55+
"rate: {}, "
56+
"ignore_thredhold: {}, counter: {} , always_true: {}",
57+
filter_id, _judge_filter_rows, _judge_input_rows,
58+
static_cast<double>(_judge_filter_rows) / static_cast<double>(_judge_input_rows),
59+
ignore_thredhold, _judge_counter, _always_true);
60+
}
61+
62+
bool maybe_always_true_can_ignore() const {
63+
/// TODO: maybe we can use session variable to control this behavior ?
64+
if (config::runtime_filter_sampling_frequency <= 0) {
65+
return false;
66+
} else {
67+
return _always_true;
68+
}
69+
}
70+
71+
static void judge_selectivity(double ignore_threshold, int64_t filter_rows, int64_t input_rows,
72+
bool& always_true) {
73+
// if the judged input rows is too small, we think the selectivity is not reliable
74+
if (input_rows > min_judge_input_rows) {
75+
always_true = (static_cast<double>(filter_rows) / static_cast<double>(input_rows)) <
76+
ignore_threshold;
77+
}
78+
}
79+
80+
private:
81+
void reset_judge_selectivity() {
82+
_always_true = false;
83+
_judge_counter = 0;
84+
_judge_input_rows = 0;
85+
_judge_filter_rows = 0;
86+
}
87+
88+
int64_t _judge_input_rows = 0;
89+
int64_t _judge_filter_rows = 0;
90+
int _judge_counter = 0;
91+
bool _always_true = false;
92+
93+
constexpr static int64_t min_judge_input_rows = 4096 * 10;
94+
};
95+
96+
} // namespace doris

be/src/vec/exprs/vexpr.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,5 +1015,60 @@ bool VExpr::ann_dist_is_fulfilled() const {
10151015
return _virtual_column_is_fulfilled;
10161016
}
10171017

1018+
Status VExpr::execute_filter(VExprContext* context, const Block* block,
1019+
uint8_t* __restrict result_filter_data, size_t rows, bool accept_null,
1020+
bool* can_filter_all) const {
1021+
ColumnPtr filter_column;
1022+
RETURN_IF_ERROR(execute_column(context, block, filter_column));
1023+
if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
1024+
// const(nullable) or const(bool)
1025+
const bool result = accept_null
1026+
? (const_column->is_null_at(0) || const_column->get_bool(0))
1027+
: (!const_column->is_null_at(0) && const_column->get_bool(0));
1028+
if (!result) {
1029+
// filter all
1030+
*can_filter_all = true;
1031+
memset(result_filter_data, 0, rows);
1032+
return Status::OK();
1033+
}
1034+
} else if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
1035+
// nullable(bool)
1036+
const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
1037+
const IColumn::Filter& filter = assert_cast<const ColumnUInt8&>(*nested_column).get_data();
1038+
const auto* __restrict filter_data = filter.data();
1039+
const auto* __restrict null_map_data = nullable_column->get_null_map_data().data();
1040+
1041+
if (accept_null) {
1042+
for (size_t i = 0; i < rows; ++i) {
1043+
result_filter_data[i] &= (null_map_data[i]) || filter_data[i];
1044+
}
1045+
} else {
1046+
for (size_t i = 0; i < rows; ++i) {
1047+
result_filter_data[i] &= (!null_map_data[i]) & filter_data[i];
1048+
}
1049+
}
1050+
1051+
if ((memchr(result_filter_data, 0x1, rows) == nullptr)) {
1052+
*can_filter_all = true;
1053+
return Status::OK();
1054+
}
1055+
} else {
1056+
// bool
1057+
const IColumn::Filter& filter = assert_cast<const ColumnUInt8&>(*filter_column).get_data();
1058+
const auto* __restrict filter_data = filter.data();
1059+
1060+
for (size_t i = 0; i < rows; ++i) {
1061+
result_filter_data[i] &= filter_data[i];
1062+
}
1063+
1064+
if (memchr(result_filter_data, 0x1, rows) == nullptr) {
1065+
*can_filter_all = true;
1066+
return Status::OK();
1067+
}
1068+
}
1069+
1070+
return Status::OK();
1071+
}
1072+
10181073
#include "common/compile_check_end.h"
10191074
} // namespace doris::vectorized

be/src/vec/exprs/vexpr.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ class VExpr {
147147
// Therefore we need a function like this to return the actual type produced by execution.
148148
virtual DataTypePtr execute_type(const Block* block) const { return _data_type; }
149149

150+
virtual Status execute_filter(VExprContext* context, const Block* block,
151+
uint8_t* __restrict result_filter_data, size_t rows,
152+
bool accept_null, bool* can_filter_all) const;
153+
150154
// `is_blockable` means this expr will be blocked in `execute` (e.g. AI Function, Remote Function)
151155
[[nodiscard]] virtual bool is_blockable() const {
152156
return std::any_of(_children.begin(), _children.end(),
@@ -204,12 +208,6 @@ class VExpr {
204208
[](VExprSPtr child) { return child->is_rf_wrapper(); });
205209
}
206210

207-
virtual void do_judge_selectivity(uint64_t filter_rows, uint64_t input_rows) {
208-
for (auto child : _children) {
209-
child->do_judge_selectivity(filter_rows, input_rows);
210-
}
211-
}
212-
213211
static Status create_expr_tree(const TExpr& texpr, VExprContextSPtr& ctx);
214212

215213
static Status create_expr_trees(const std::vector<TExpr>& texprs, VExprContextSPtrs& ctxs);

be/src/vec/exprs/vexpr_context.cpp

Lines changed: 10 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,12 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
199199
return execute_conjuncts(ctxs, filters, false, block, result_filter, can_filter_all);
200200
}
201201

202-
// TODO: Performance Optimization
202+
Status VExprContext::execute_filter(const Block* block, uint8_t* __restrict result_filter_data,
203+
size_t rows, bool accept_null, bool* can_filter_all) {
204+
return _root->execute_filter(this, block, result_filter_data, rows, accept_null,
205+
can_filter_all);
206+
}
207+
203208
Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
204209
const std::vector<IColumn::Filter*>* filters,
205210
bool accept_null, const Block* block,
@@ -209,85 +214,10 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
209214
*can_filter_all = false;
210215
auto* __restrict result_filter_data = result_filter->data();
211216
for (const auto& ctx : ctxs) {
212-
// Statistics are only required when an rf wrapper exists in the expr.
213-
bool is_rf_wrapper = ctx->root()->is_rf_wrapper();
214-
ColumnPtr filter_column;
215-
RETURN_IF_ERROR(ctx->execute(block, filter_column));
216-
if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
217-
size_t column_size = nullable_column->size();
218-
if (column_size == 0) {
219-
*can_filter_all = true;
220-
return Status::OK();
221-
} else {
222-
const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
223-
const IColumn::Filter& filter =
224-
assert_cast<const ColumnUInt8&>(*nested_column).get_data();
225-
const auto* __restrict filter_data = filter.data();
226-
const auto* __restrict null_map_data = nullable_column->get_null_map_data().data();
227-
228-
size_t input_rows =
229-
rows - (is_rf_wrapper
230-
? simd::count_zero_num((int8_t*)result_filter_data, rows)
231-
: 0);
232-
233-
if (accept_null) {
234-
for (size_t i = 0; i < rows; ++i) {
235-
result_filter_data[i] &= (null_map_data[i]) || filter_data[i];
236-
}
237-
} else {
238-
for (size_t i = 0; i < rows; ++i) {
239-
result_filter_data[i] &= (!null_map_data[i]) & filter_data[i];
240-
}
241-
}
242-
243-
size_t output_rows =
244-
rows - (is_rf_wrapper
245-
? simd::count_zero_num((int8_t*)result_filter_data, rows)
246-
: 0);
247-
248-
if (is_rf_wrapper) {
249-
ctx->root()->do_judge_selectivity(input_rows - output_rows, input_rows);
250-
}
251-
252-
if ((is_rf_wrapper && output_rows == 0) ||
253-
(!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) == nullptr)) {
254-
*can_filter_all = true;
255-
return Status::OK();
256-
}
257-
}
258-
} else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
259-
// filter all
260-
if (!const_column->get_bool(0)) {
261-
*can_filter_all = true;
262-
memset(result_filter_data, 0, result_filter->size());
263-
return Status::OK();
264-
}
265-
} else {
266-
const IColumn::Filter& filter =
267-
assert_cast<const ColumnUInt8&>(*filter_column).get_data();
268-
const auto* __restrict filter_data = filter.data();
269-
270-
size_t input_rows =
271-
rows -
272-
(is_rf_wrapper ? simd::count_zero_num((int8_t*)result_filter_data, rows) : 0);
273-
274-
for (size_t i = 0; i < rows; ++i) {
275-
result_filter_data[i] &= filter_data[i];
276-
}
277-
278-
size_t output_rows =
279-
rows -
280-
(is_rf_wrapper ? simd::count_zero_num((int8_t*)result_filter_data, rows) : 0);
281-
282-
if (is_rf_wrapper) {
283-
ctx->root()->do_judge_selectivity(input_rows - output_rows, input_rows);
284-
}
285-
286-
if ((is_rf_wrapper && output_rows == 0) ||
287-
(!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) == nullptr)) {
288-
*can_filter_all = true;
289-
return Status::OK();
290-
}
217+
RETURN_IF_ERROR(
218+
ctx->execute_filter(block, result_filter_data, rows, accept_null, can_filter_all));
219+
if (*can_filter_all) {
220+
return Status::OK();
291221
}
292222
}
293223
if (filters != nullptr) {

be/src/vec/exprs/vexpr_context.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "olap/rowset/segment_v2/inverted_index_reader.h"
3434
#include "runtime/runtime_state.h"
3535
#include "runtime/types.h"
36+
#include "runtime_filter/runtime_filter_selectivity.h"
3637
#include "udf/udf.h"
3738
#include "vec/columns/column.h"
3839
#include "vec/core/block.h"
@@ -210,6 +211,9 @@ class VExprContext {
210211

211212
bool all_expr_inverted_index_evaluated();
212213

214+
Status execute_filter(const Block* block, uint8_t* __restrict result_filter_data, size_t rows,
215+
bool accept_null, bool* can_filter_all);
216+
213217
[[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block);
214218

215219
[[nodiscard]] static Status filter_block(const VExprContextSPtrs& expr_contexts, Block* block,
@@ -246,6 +250,8 @@ class VExprContext {
246250
return _last_result_column_id;
247251
}
248252

253+
RuntimeFilterSelectivity& get_runtime_filter_selectivity() { return *_rf_selectivity; }
254+
249255
FunctionContext::FunctionStateScope get_function_state_scope() const {
250256
return _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL;
251257
}
@@ -337,5 +343,8 @@ class VExprContext {
337343

338344
segment_v2::AnnRangeSearchRuntime _ann_range_search_runtime;
339345
bool _suitable_for_ann_index = true;
346+
347+
std::unique_ptr<RuntimeFilterSelectivity> _rf_selectivity =
348+
std::make_unique<RuntimeFilterSelectivity>();
340349
};
341350
} // namespace doris::vectorized

0 commit comments

Comments
 (0)