Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,19 @@ class ColumnValueRange {

int scale() const { return _scale; }

static void add_fixed_value_range(ColumnValueRange<primitive_type>& range,
static void add_fixed_value_range(ColumnValueRange<primitive_type>& range, SQLFilterOp op,
const CppType* value) {
static_cast<void>(range.add_fixed_value(*value));
}

static void remove_fixed_value_range(ColumnValueRange<primitive_type>& range,
static void remove_fixed_value_range(ColumnValueRange<primitive_type>& range, SQLFilterOp op,
const CppType* value) {
range.remove_fixed_value(*value);
}

static void empty_function(ColumnValueRange<primitive_type>& range, SQLFilterOp op,
const CppType* value) {}

static void add_value_range(ColumnValueRange<primitive_type>& range, SQLFilterOp op,
const CppType* value) {
static_cast<void>(range.add_range(op, *value));
Expand Down
39 changes: 5 additions & 34 deletions be/src/exec/olap_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,43 +67,14 @@ enum SQLFilterOp {
FILTER_LESS = 2,
FILTER_LESS_OR_EQUAL = 3,
FILTER_IN = 4,
FILTER_NOT_IN = 5
FILTER_NOT_IN = 5,
FILTER_EQ = 6,
FILTER_NE = 7
};

template <PrimitiveType>
constexpr bool always_false_v = false;

inline SQLFilterOp to_olap_filter_type(TExprOpcode::type type, bool opposite) {
switch (type) {
case TExprOpcode::LT:
return opposite ? FILTER_LARGER : FILTER_LESS;

case TExprOpcode::LE:
return opposite ? FILTER_LARGER_OR_EQUAL : FILTER_LESS_OR_EQUAL;

case TExprOpcode::GT:
return opposite ? FILTER_LESS : FILTER_LARGER;

case TExprOpcode::GE:
return opposite ? FILTER_LESS_OR_EQUAL : FILTER_LARGER_OR_EQUAL;

case TExprOpcode::EQ:
return opposite ? FILTER_NOT_IN : FILTER_IN;

case TExprOpcode::NE:
return opposite ? FILTER_IN : FILTER_NOT_IN;

case TExprOpcode::EQ_FOR_NULL:
return FILTER_IN;

default:
VLOG_CRITICAL << "TExprOpcode: " << type;
DCHECK(false);
}

return FILTER_IN;
}

inline SQLFilterOp to_olap_filter_type(const std::string& function_name) {
if (function_name == "lt") {
return FILTER_LESS;
Expand All @@ -114,9 +85,9 @@ inline SQLFilterOp to_olap_filter_type(const std::string& function_name) {
} else if (function_name == "ge") {
return FILTER_LARGER_OR_EQUAL;
} else if (function_name == "eq") {
return FILTER_IN;
return FILTER_EQ;
} else if (function_name == "ne") {
return FILTER_NOT_IN;
return FILTER_NE;
} else if (function_name == "in") {
return FILTER_IN;
} else if (function_name == "not_in") {
Expand Down
44 changes: 0 additions & 44 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,50 +55,6 @@ PushDownType FileScanLocalState::_should_push_down_binary_predicate(
}
}

bool FileScanLocalState::_should_push_down_or_predicate_recursively(
const vectorized::VExprSPtr& expr) const {
if (expr->node_type() == TExprNodeType::COMPOUND_PRED &&
expr->op() == TExprOpcode::COMPOUND_OR) {
return std::ranges::all_of(expr->children(), [this](const vectorized::VExprSPtr& it) {
return _should_push_down_or_predicate_recursively(it);
});
} else if (expr->node_type() == TExprNodeType::COMPOUND_PRED &&
expr->op() == TExprOpcode::COMPOUND_AND) {
return std::ranges::any_of(expr->children(), [this](const vectorized::VExprSPtr& it) {
return _should_push_down_or_predicate_recursively(it);
});
} else {
auto children = expr->children();
if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) {
// not a slot ref(column)
return false;
}
std::shared_ptr<vectorized::VSlotRef> slot_ref =
std::dynamic_pointer_cast<vectorized::VSlotRef>(children[0]);
auto entry = _slot_id_to_predicates.find(slot_ref->slot_id());
if (_slot_id_to_predicates.end() == entry) {
return false;
}
if (is_complex_type(slot_ref->data_type()->get_primitive_type())) {
return false;
}
return true;
}
}

PushDownType FileScanLocalState::_should_push_down_or_predicate(
const vectorized::VExprContext* expr_ctx) const {
// TODO(gabriel): Do not push down OR predicate for the time being.
// auto expr = expr_ctx->root()->get_impl() ? expr_ctx->root()->get_impl() : expr_ctx->root();
// if (expr->node_type() == TExprNodeType::COMPOUND_PRED &&
// expr->op() == TExprOpcode::COMPOUND_OR) {
// if (_should_push_down_or_predicate_recursively(expr)) {
// return PushDownType::PARTIAL_ACCEPTABLE;
// }
// }
return PushDownType::UNACCEPTABLE;
}

int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) const {
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
PushDownType _should_push_down_binary_predicate(
vectorized::VectorizedFnCall* fn_call, vectorized::VExprContext* expr_ctx,
StringRef* constant_val, const std::set<std::string> fn_name) const override;
PushDownType _should_push_down_or_predicate(
const vectorized::VExprContext* expr_ctx) const override;
bool _should_push_down_or_predicate_recursively(const vectorized::VExprSPtr& expr) const;
std::shared_ptr<vectorized::SplitSourceConnector> _split_source = nullptr;
int _max_scanners;
// A in memory cache to save some common components
Expand Down
32 changes: 0 additions & 32 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,19 +450,6 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
return Status::OK();
}
SCOPED_TIMER(_scanner_init_timer);

if (!_conjuncts.empty() && _state->enable_profile()) {
std::string message;
for (auto& conjunct : _conjuncts) {
if (conjunct->root()) {
if (!message.empty()) {
message += ", ";
}
message += conjunct->root()->debug_string();
}
}
custom_profile()->add_info_string("RemainedDownPredicates", message);
}
auto& p = _parent->cast<OlapScanOperatorX>();

for (auto uid : p._olap_scan_node.output_column_unique_ids) {
Expand Down Expand Up @@ -834,23 +821,6 @@ void OlapScanLocalState::set_scan_ranges(RuntimeState* state,
}
}

static std::string predicates_to_string(
const phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
slot_id_to_predicates) {
fmt::memory_buffer debug_string_buffer;
for (const auto& [slot_id, predicates] : slot_id_to_predicates) {
if (predicates.empty()) {
continue;
}
fmt::format_to(debug_string_buffer, "Slot ID: {}: [", slot_id);
for (const auto& predicate : predicates) {
fmt::format_to(debug_string_buffer, "{{{}}}, ", predicate->debug_string());
}
fmt::format_to(debug_string_buffer, "] ");
}
return fmt::to_string(debug_string_buffer);
}

static std::string tablets_id_to_string(
const std::vector<std::unique_ptr<TPaloScanRange>>& scan_ranges) {
if (scan_ranges.empty()) {
Expand Down Expand Up @@ -960,8 +930,6 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
}

if (state()->enable_profile()) {
custom_profile()->add_info_string("PushDownPredicates",
predicates_to_string(_slot_id_to_predicates));
custom_profile()->add_info_string("KeyRanges", _scan_keys.debug_string());
custom_profile()->add_info_string("TabletIds", tablets_id_to_string(_scan_ranges));
}
Expand Down
Loading
Loading