Skip to content

Commit 3266935

Browse files
authored
[fix](parquet)fix parquet reader lazy materialization cannot filter. (#60474)
### What problem does this PR solve? Related PR: #60197 Problem Summary: This fix Parquet reader lazy materialization invalid issue in PR #60197 caused by the removal of feature #59053.
1 parent d489a79 commit 3266935

File tree

4 files changed

+920
-11
lines changed

4 files changed

+920
-11
lines changed

be/src/vec/exec/scan/file_scanner.cpp

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -352,15 +352,11 @@ Status FileScanner::_process_conjuncts() {
352352
return Status::OK();
353353
}
354354

355-
Status FileScanner::_process_late_arrival_conjuncts(bool* changed,
356-
VExprContextSPtrs& new_push_down_conjuncts) {
357-
*changed = false;
355+
Status FileScanner::_process_late_arrival_conjuncts() {
358356
if (_push_down_conjuncts.size() < _conjuncts.size()) {
359-
*changed = true;
360357
_push_down_conjuncts = _conjuncts;
361358
_conjuncts.clear();
362359
RETURN_IF_ERROR(_process_conjuncts());
363-
new_push_down_conjuncts = _push_down_conjuncts;
364360
}
365361
if (_applied_rf_num == _total_rf_num) {
366362
_local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
@@ -1038,7 +1034,9 @@ Status FileScanner::_get_next_reader() {
10381034
// ATTN: the push down agg type may be set back to NONE,
10391035
// see IcebergTableReader::init_row_filters for example.
10401036
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1041-
1037+
if (push_down_predicates) {
1038+
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1039+
}
10421040
RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
10431041

10441042
need_to_get_parsed_schema = true;
@@ -1059,9 +1057,7 @@ Status FileScanner::_get_next_reader() {
10591057

10601058
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
10611059
if (push_down_predicates) {
1062-
bool changed = false;
1063-
VExprContextSPtrs new_push_down_conjuncts;
1064-
RETURN_IF_ERROR(_process_late_arrival_conjuncts(&changed, new_push_down_conjuncts));
1060+
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
10651061
}
10661062
RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
10671063

be/src/vec/exec/scan/file_scanner.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,7 @@ class FileScanner : public Scanner {
252252
void _init_runtime_filter_partition_prune_block();
253253
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
254254
Status _process_conjuncts();
255-
Status _process_late_arrival_conjuncts(bool* changed,
256-
VExprContextSPtrs& new_push_down_conjuncts);
255+
Status _process_late_arrival_conjuncts();
257256
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
258257
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
259258
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import groovy.json.JsonSlurper
19+
20+
suite("test_orc_lazy_mat_profile", "p0,external,hive,external_docker,external_docker_hive") {
21+
def getProfileList = {
22+
def dst = 'http://' + context.config.feHttpAddress
23+
def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
24+
conn.setRequestMethod("GET")
25+
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
26+
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
27+
conn.setRequestProperty("Authorization", "Basic ${encoding}")
28+
return conn.getInputStream().getText()
29+
}
30+
31+
def getProfile = { id ->
32+
def dst = 'http://' + context.config.feHttpAddress
33+
def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection()
34+
conn.setRequestMethod("GET")
35+
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
36+
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
37+
conn.setRequestProperty("Authorization", "Basic ${encoding}")
38+
return conn.getInputStream().getText()
39+
}
40+
41+
def getProfileWithToken = { token ->
42+
String profileId = ""
43+
int attempts = 0
44+
while (attempts < 10 && (profileId == null || profileId == "")) {
45+
List profileData = new JsonSlurper().parseText(getProfileList()).data.rows
46+
for (def profileItem in profileData) {
47+
if (profileItem["Sql Statement"].toString().contains(token)) {
48+
profileId = profileItem["Profile ID"].toString()
49+
break
50+
}
51+
}
52+
if (profileId == null || profileId == "") {
53+
Thread.sleep(300)
54+
}
55+
attempts++
56+
}
57+
assertTrue(profileId != null && profileId != "")
58+
Thread.sleep(800)
59+
return getProfile(profileId).toString()
60+
}
61+
62+
def extractProfileBlockMetrics = {String profileText, String blockName ->
63+
List<String> lines = profileText.readLines()
64+
65+
Map<String, String> metrics = [:]
66+
boolean inBlock = false
67+
int blockIndent = -1
68+
69+
lines.each { line ->
70+
if (!inBlock) {
71+
def m = line =~ /^(\s*)-\s+${Pattern.quote(blockName)}:/
72+
if (m.find()) {
73+
inBlock = true
74+
blockIndent = m.group(1).length()
75+
}
76+
} else {
77+
// 当前行缩进
78+
def indent = (line =~ /^(\s*)/)[0][1].length()
79+
80+
if (indent > blockIndent) {
81+
def kv = line =~ /^\s*-\s*([^:]+):\s*(.+)$/
82+
if (kv.matches()) {
83+
metrics[kv[0][1].trim()] = kv[0][2].trim()
84+
}
85+
} else {
86+
// 缩进回退,block 结束
87+
inBlock = false
88+
}
89+
}
90+
}
91+
92+
return metrics
93+
}
94+
95+
def extractProfileValue = { String profileText, String keyName ->
96+
def matcher = profileText =~ /(?m)^\s*-\s*${keyName}:\s*(.+)$/
97+
return matcher.find() ? matcher.group(1).trim() : null
98+
}
99+
100+
// session vars
101+
sql "unset variable all;"
102+
sql "set profile_level=2;"
103+
sql "set enable_profile=true;"
104+
sql " set parallel_pipeline_task_num = 1;"
105+
sql " set file_split_size = 10000000;"
106+
sql """set max_file_scanners_concurrency = 1; """
107+
108+
String enabled = context.config.otherConfigs.get("enableHiveTest")
109+
if (!"true".equalsIgnoreCase(enabled)) {
110+
return;
111+
}
112+
113+
for (String hivePrefix : ["hive2"]) {
114+
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
115+
String hmsPort = context.config.otherConfigs.get(hivePrefix + "HmsPort")
116+
String catalog_name = "test_orc_lazy_mat_profile"
117+
118+
sql """drop catalog if exists ${catalog_name};"""
119+
sql """
120+
create catalog if not exists ${catalog_name} properties (
121+
'type'='hms',
122+
'hadoop.username' = 'hadoop',
123+
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}'
124+
);
125+
"""
126+
logger.info("catalog " + catalog_name + " created")
127+
sql """switch ${catalog_name};"""
128+
logger.info("switched to catalog " + catalog_name)
129+
130+
sql """ use `global_lazy_mat_db` """
131+
132+
def q1 = {
133+
def t1 = UUID.randomUUID().toString()
134+
135+
def sql_result = sql """
136+
select *, "${t1}" from orc_topn_lazy_mat_table where file_id = 1 and id = 1;
137+
"""
138+
logger.info("sql_result = ${sql_result}");
139+
return getProfileWithToken(t1);
140+
}
141+
142+
143+
def q2 = {
144+
def t1 = UUID.randomUUID().toString()
145+
146+
def sql_result = sql """
147+
select *, "${t1}" from orc_topn_lazy_mat_table where file_id = 1 and id <= 2;
148+
"""
149+
logger.info("sql_result = ${sql_result}");
150+
return getProfileWithToken(t1);
151+
}
152+
153+
def q3 = {
154+
def t1 = UUID.randomUUID().toString()
155+
156+
def sql_result = sql """
157+
select *, "${t1}" from orc_topn_lazy_mat_table where file_id = 1 and id <= 3;
158+
"""
159+
logger.info("sql_result = ${sql_result}");
160+
return getProfileWithToken(t1);
161+
}
162+
163+
def q4 = {
164+
def t1 = UUID.randomUUID().toString()
165+
166+
def sql_result = sql """
167+
select *, "${t1}" from orc_topn_lazy_mat_table where file_id = 1 and id < 0;
168+
"""
169+
logger.info("sql_result = ${sql_result}");
170+
return getProfileWithToken(t1);
171+
}
172+
173+
def test_true_true = {
174+
sql " set enable_orc_filter_by_min_max = true; "
175+
sql " set enable_orc_lazy_materialization = true; "
176+
177+
def profileStr = q1()
178+
logger.info("profileStr = \n${profileStr}");
179+
assertEquals("2", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
180+
assertEquals("3", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
181+
assertEquals("1", extractProfileValue(profileStr, "SelectedRowGroupCount"))
182+
183+
profileStr = q2()
184+
logger.info("profileStr = \n${profileStr}");
185+
assertEquals("1", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
186+
assertEquals("3", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
187+
assertEquals("1", extractProfileValue(profileStr, "SelectedRowGroupCount"))
188+
189+
profileStr = q3()
190+
logger.info("profileStr = \n${profileStr}");
191+
assertEquals("0", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
192+
assertEquals("3", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
193+
assertEquals("1", extractProfileValue(profileStr, "SelectedRowGroupCount"))
194+
195+
profileStr = q4()
196+
logger.info("profileStr = \n${profileStr}");
197+
assertEquals("0", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
198+
assertEquals("3", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
199+
assertEquals("0", extractProfileValue(profileStr, "SelectedRowGroupCount"))
200+
}
201+
test_true_true();
202+
203+
204+
def test_true_false = {
205+
sql " set enable_orc_filter_by_min_max = true; "
206+
sql " set enable_orc_lazy_materialization = false; "
207+
208+
def profileStr = q1()
209+
logger.info("profileStr = \n${profileStr}");
210+
assertEquals("0", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
211+
assertEquals("3", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
212+
assertEquals("1", extractProfileValue(profileStr, "SelectedRowGroupCount"))
213+
214+
profileStr = q2()
215+
logger.info("profileStr = \n${profileStr}");
216+
assertEquals("0", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
217+
assertEquals("3", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
218+
assertEquals("1", extractProfileValue(profileStr, "SelectedRowGroupCount"))
219+
220+
profileStr = q3()
221+
logger.info("profileStr = \n${profileStr}");
222+
assertEquals("0", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
223+
assertEquals("3", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
224+
assertEquals("1", extractProfileValue(profileStr, "SelectedRowGroupCount"))
225+
226+
profileStr = q4()
227+
logger.info("profileStr = \n${profileStr}");
228+
assertEquals("0", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
229+
assertEquals("3", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
230+
assertEquals("0", extractProfileValue(profileStr, "SelectedRowGroupCount"))
231+
}
232+
test_true_false();
233+
234+
235+
def test_false_false = {
236+
sql " set enable_orc_filter_by_min_max = false; "
237+
sql " set enable_orc_lazy_materialization = false; "
238+
239+
def profileStr = q1()
240+
logger.info("profileStr = \n${profileStr}");
241+
assertEquals("0", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
242+
assertEquals("0", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
243+
assertEquals("0", extractProfileValue(profileStr, "SelectedRowGroupCount"))
244+
245+
profileStr = q2()
246+
logger.info("profileStr = \n${profileStr}");
247+
assertEquals("0", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
248+
assertEquals("0", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
249+
assertEquals("0", extractProfileValue(profileStr, "SelectedRowGroupCount"))
250+
251+
profileStr = q3()
252+
logger.info("profileStr = \n${profileStr}");
253+
assertEquals("0", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
254+
assertEquals("0", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
255+
assertEquals("0", extractProfileValue(profileStr, "SelectedRowGroupCount"))
256+
257+
profileStr = q4()
258+
logger.info("profileStr = \n${profileStr}");
259+
assertEquals("0", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
260+
assertEquals("0", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
261+
assertEquals("0", extractProfileValue(profileStr, "SelectedRowGroupCount"))
262+
}
263+
test_false_false();
264+
265+
266+
267+
def test_false_true = {
268+
sql " set enable_orc_filter_by_min_max = false; "
269+
sql " set enable_orc_lazy_materialization = true; "
270+
271+
def profileStr = q1()
272+
logger.info("profileStr = \n${profileStr}");
273+
assertEquals("8", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
274+
assertEquals("0", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
275+
assertEquals("0", extractProfileValue(profileStr, "SelectedRowGroupCount"))
276+
277+
profileStr = q2()
278+
logger.info("profileStr = \n${profileStr}");
279+
assertEquals("7", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
280+
assertEquals("0", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
281+
assertEquals("0", extractProfileValue(profileStr, "SelectedRowGroupCount"))
282+
283+
profileStr = q3()
284+
logger.info("profileStr = \n${profileStr}");
285+
assertEquals("6", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
286+
assertEquals("0", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
287+
assertEquals("0", extractProfileValue(profileStr, "SelectedRowGroupCount"))
288+
289+
290+
profileStr = q4()
291+
logger.info("profileStr = \n${profileStr}");
292+
assertEquals("9", extractProfileValue(profileStr, "FilteredRowsByLazyRead"))
293+
assertEquals("0", extractProfileValue(profileStr, "EvaluatedRowGroupCount"))
294+
assertEquals("0", extractProfileValue(profileStr, "SelectedRowGroupCount"))
295+
}
296+
test_false_true();
297+
298+
299+
300+
301+
302+
303+
sql """drop catalog ${catalog_name};"""
304+
}
305+
306+
307+
308+
309+
310+
}

0 commit comments

Comments
 (0)