Skip to content

Commit e25ddfd

Browse files
committed
[Enhancement](multi-catalog) Add PredicateFilterTime, DictFilterRewriteTime, LazyReadFilteredRows profile metrics in parquet orc profiles.
Cherry-pick apache#51248
1 parent 67bf3ee commit e25ddfd

File tree

6 files changed

+175
-137
lines changed

6 files changed

+175
-137
lines changed

be/src/vec/exec/format/orc/vorc_reader.cpp

Lines changed: 74 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,9 @@ void OrcReader::_collect_profile_before_close() {
200200
COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time);
201201
COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time);
202202
COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time);
203-
COUNTER_UPDATE(_orc_profile.filter_block_time, _statistics.filter_block_time);
203+
COUNTER_UPDATE(_orc_profile.predicate_filter_time, _statistics.predicate_filter_time);
204+
COUNTER_UPDATE(_orc_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time);
205+
COUNTER_UPDATE(_orc_profile.lazy_read_filtered_rows, _statistics.lazy_read_filtered_rows);
204206

205207
if (_file_input_stream != nullptr) {
206208
_file_input_stream->collect_profile_before_close();
@@ -234,8 +236,16 @@ void OrcReader::_init_profile() {
234236
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime", orc_profile, 1);
235237
_orc_profile.decode_null_map_time =
236238
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeNullMapTime", orc_profile, 1);
237-
_orc_profile.filter_block_time =
238-
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FilterBlockTime", orc_profile, 1);
239+
_orc_profile.predicate_filter_time =
240+
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", orc_profile, 1);
241+
_orc_profile.dict_filter_rewrite_time =
242+
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DictFilterRewriteTime", orc_profile, 1);
243+
_orc_profile.lazy_read_filtered_rows =
244+
ADD_COUNTER_WITH_LEVEL(_profile, "FilteredRowsByLazyRead", TUnit::UNIT, 1);
245+
_orc_profile.selected_row_group_count =
246+
ADD_COUNTER_WITH_LEVEL(_profile, "SelectedRowGroupCount", TUnit::UNIT, 1);
247+
_orc_profile.evaluated_row_group_count =
248+
ADD_COUNTER_WITH_LEVEL(_profile, "EvaluatedRowGroupCount", TUnit::UNIT, 1);
239249
}
240250
}
241251

@@ -1714,15 +1724,18 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
17141724
*read_rows = 0;
17151725
return Status::OK();
17161726
}
1717-
_execute_filter_position_delete_rowids(*_filter);
17181727
{
1719-
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
1720-
RETURN_IF_CATCH_EXCEPTION(
1721-
Block::filter_block_internal(block, columns_to_filter, *_filter));
1728+
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
1729+
_execute_filter_position_delete_rowids(*_filter);
1730+
{
1731+
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
1732+
RETURN_IF_CATCH_EXCEPTION(
1733+
Block::filter_block_internal(block, columns_to_filter, *_filter));
1734+
}
1735+
Block::erase_useless_column(block, column_to_keep);
1736+
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
1737+
*read_rows = block->rows();
17221738
}
1723-
Block::erase_useless_column(block, column_to_keep);
1724-
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
1725-
*read_rows = block->rows();
17261739
} else {
17271740
uint64_t rr;
17281741
SCOPED_RAW_TIMER(&_statistics.column_read_time);
@@ -1799,63 +1812,60 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
17991812
return Status::OK();
18001813
}
18011814

1802-
_build_delete_row_filter(block, _batch->numElements);
1803-
1804-
std::vector<uint32_t> columns_to_filter;
1805-
int column_to_keep = block->columns();
1806-
columns_to_filter.resize(column_to_keep);
1807-
for (uint32_t i = 0; i < column_to_keep; ++i) {
1808-
columns_to_filter[i] = i;
1809-
}
1810-
if (!_lazy_read_ctx.conjuncts.empty()) {
1811-
VExprContextSPtrs filter_conjuncts;
1812-
filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
1813-
_filter_conjuncts.end());
1814-
for (auto& conjunct : _dict_filter_conjuncts) {
1815-
filter_conjuncts.emplace_back(conjunct);
1816-
}
1817-
for (auto& conjunct : _non_dict_filter_conjuncts) {
1818-
filter_conjuncts.emplace_back(conjunct);
1819-
}
1820-
std::vector<IColumn::Filter*> filters;
1821-
if (_delete_rows_filter_ptr) {
1822-
filters.push_back(_delete_rows_filter_ptr.get());
1823-
}
1824-
IColumn::Filter result_filter(block->rows(), 1);
1825-
bool can_filter_all = false;
1826-
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
1827-
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
1828-
if (can_filter_all) {
1829-
for (auto& col : columns_to_filter) {
1830-
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
1815+
{
1816+
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
1817+
_build_delete_row_filter(block, _batch->numElements);
1818+
1819+
std::vector<uint32_t> columns_to_filter;
1820+
int column_to_keep = block->columns();
1821+
columns_to_filter.resize(column_to_keep);
1822+
for (uint32_t i = 0; i < column_to_keep; ++i) {
1823+
columns_to_filter[i] = i;
1824+
}
1825+
if (!_lazy_read_ctx.conjuncts.empty()) {
1826+
VExprContextSPtrs filter_conjuncts;
1827+
filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
1828+
_filter_conjuncts.end());
1829+
for (auto& conjunct : _dict_filter_conjuncts) {
1830+
filter_conjuncts.emplace_back(conjunct);
18311831
}
1832-
Block::erase_useless_column(block, column_to_keep);
1833-
return _convert_dict_cols_to_string_cols(block, &batch_vec);
1834-
}
1835-
_execute_filter_position_delete_rowids(result_filter);
1836-
{
1837-
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
1832+
for (auto& conjunct : _non_dict_filter_conjuncts) {
1833+
filter_conjuncts.emplace_back(conjunct);
1834+
}
1835+
std::vector<IColumn::Filter*> filters;
1836+
if (_delete_rows_filter_ptr) {
1837+
filters.push_back(_delete_rows_filter_ptr.get());
1838+
}
1839+
IColumn::Filter result_filter(block->rows(), 1);
1840+
bool can_filter_all = false;
1841+
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
1842+
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
1843+
if (can_filter_all) {
1844+
for (auto& col : columns_to_filter) {
1845+
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
1846+
}
1847+
Block::erase_useless_column(block, column_to_keep);
1848+
return _convert_dict_cols_to_string_cols(block, &batch_vec);
1849+
}
1850+
_execute_filter_position_delete_rowids(result_filter);
18381851
RETURN_IF_CATCH_EXCEPTION(
18391852
Block::filter_block_internal(block, columns_to_filter, result_filter));
1840-
}
1841-
Block::erase_useless_column(block, column_to_keep);
1842-
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
1843-
} else {
1844-
if (_delete_rows_filter_ptr) {
1845-
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
1846-
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
1847-
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter,
1848-
(*_delete_rows_filter_ptr)));
1853+
Block::erase_useless_column(block, column_to_keep);
18491854
} else {
1850-
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
1851-
_execute_filter_position_delete_rowids(*filter);
1852-
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
1853-
RETURN_IF_CATCH_EXCEPTION(
1854-
Block::filter_block_internal(block, columns_to_filter, (*filter)));
1855+
if (_delete_rows_filter_ptr) {
1856+
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
1857+
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
1858+
block, columns_to_filter, (*_delete_rows_filter_ptr)));
1859+
} else {
1860+
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
1861+
_execute_filter_position_delete_rowids(*filter);
1862+
RETURN_IF_CATCH_EXCEPTION(
1863+
Block::filter_block_internal(block, columns_to_filter, (*filter)));
1864+
}
1865+
Block::erase_useless_column(block, column_to_keep);
18551866
}
1856-
Block::erase_useless_column(block, column_to_keep);
1857-
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
18581867
}
1868+
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
18591869
*read_rows = block->rows();
18601870
}
18611871
return Status::OK();
@@ -1899,6 +1909,7 @@ void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
18991909
}
19001910

19011911
Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) {
1912+
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
19021913
Block* block = (Block*)arg;
19031914
size_t origin_column_num = block->columns();
19041915

@@ -1999,6 +2010,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
19992010
sel[new_size] = i;
20002011
new_size += result_filter_data[i] ? 1 : 0;
20012012
}
2013+
_statistics.lazy_read_filtered_rows += static_cast<int64_t>(size - new_size);
20022014
data.numElements = new_size;
20032015
return Status::OK();
20042016
}
@@ -2072,6 +2084,7 @@ bool OrcReader::_can_filter_by_dict(int slot_id) {
20722084
Status OrcReader::on_string_dicts_loaded(
20732085
std::unordered_map<std::string, orc::StringDictionary*>& file_column_name_to_dict_map,
20742086
bool* is_stripe_filtered) {
2087+
SCOPED_RAW_TIMER(&_statistics.dict_filter_rewrite_time);
20752088
*is_stripe_filtered = false;
20762089
for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
20772090
std::string& dict_filter_col_name = it->first;

be/src/vec/exec/format/orc/vorc_reader.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ class OrcReader : public GenericReader {
128128
int64_t set_fill_column_time = 0;
129129
int64_t decode_value_time = 0;
130130
int64_t decode_null_map_time = 0;
131-
int64_t filter_block_time = 0;
131+
int64_t predicate_filter_time = 0;
132+
int64_t dict_filter_rewrite_time = 0;
133+
int64_t lazy_read_filtered_rows = 0;
132134
};
133135

134136
OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
@@ -226,7 +228,11 @@ class OrcReader : public GenericReader {
226228
RuntimeProfile::Counter* set_fill_column_time = nullptr;
227229
RuntimeProfile::Counter* decode_value_time = nullptr;
228230
RuntimeProfile::Counter* decode_null_map_time = nullptr;
229-
RuntimeProfile::Counter* filter_block_time = nullptr;
231+
RuntimeProfile::Counter* predicate_filter_time = nullptr;
232+
RuntimeProfile::Counter* dict_filter_rewrite_time = nullptr;
233+
RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr;
234+
RuntimeProfile::Counter* selected_row_group_count = nullptr;
235+
RuntimeProfile::Counter* evaluated_row_group_count = nullptr;
230236
};
231237

232238
class ORCFilterImpl : public orc::ORCFilter {

0 commit comments

Comments
 (0)