Skip to content

Commit 6fecdf7

Browse files
authored
fix(query): fix lazy topn in srf functions (#17945)
* fix(query): fix lazy topn in srf functions * update
1 parent 6b9013f commit 6fecdf7

File tree

10 files changed

+43
-8
lines changed

10 files changed

+43
-8
lines changed

src/query/service/tests/it/pipelines/filter/random_filter_expr.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ fn convert_predicate_tree_to_scalar_expr(node: PredicateNode, data_type: &DataTy
152152
data_type: Box::new(data_type.clone()),
153153
visibility: Visibility::Visible,
154154
virtual_expr: None,
155+
is_srf: false,
155156
};
156157
let scalar_expr = ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column });
157158
ScalarExpr::FunctionCall(FunctionCall {

src/query/service/tests/it/sql/planner/optimizer/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ pub fn create_table_bound_column_ref(
123123
table_index,
124124
visibility: Visibility::Visible,
125125
virtual_expr: None,
126+
is_srf: false,
126127
};
127128
ScalarExpr::BoundColumnRef(BoundColumnRef { column, span: None })
128129
}

src/query/sql/src/planner/binder/bind_context.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,16 @@ impl BindContext {
515515
.collect()
516516
}
517517

518+
pub fn has_srf_recursive(&self) -> bool {
519+
self.columns.iter().any(|x| x.is_srf)
520+
|| !self.srf_info.srfs.is_empty()
521+
|| self
522+
.parent
523+
.as_ref()
524+
.map(|p| p.has_srf_recursive())
525+
.unwrap_or(false)
526+
}
527+
518528
/// Return data scheme.
519529
pub fn output_schema(&self) -> DataSchemaRef {
520530
let fields = self

src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ impl Binder {
285285
Box::new(data_type),
286286
Visibility::Visible,
287287
)
288+
.is_srf(true)
288289
.build();
289290
bind_context.add_column_binding(column_binding);
290291

src/query/sql/src/planner/binder/column_binding.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ pub struct ColumnBinding {
3939
// Opitonal virtual expr, used by virtual computed column and variant virtual column,
4040
// `virtual_expr` will be parsed and bind to a `ScalarExpr`.
4141
pub virtual_expr: Option<String>,
42+
// A flag indicates the column binding from srf or not
43+
pub is_srf: bool,
4244
}
4345

4446
const DUMMY_INDEX: usize = usize::MAX;
@@ -77,6 +79,7 @@ impl ColumnBinding {
7779
data_type,
7880
visibility: Visibility::Visible,
7981
virtual_expr: None,
82+
is_srf: false,
8083
}
8184
}
8285

@@ -110,6 +113,7 @@ pub struct ColumnBindingBuilder {
110113
pub visibility: Visibility,
111114

112115
pub virtual_expr: Option<String>,
116+
pub is_srf: bool,
113117
}
114118

115119
impl ColumnBindingBuilder {
@@ -129,6 +133,7 @@ impl ColumnBindingBuilder {
129133
data_type,
130134
visibility,
131135
virtual_expr: None,
136+
is_srf: false,
132137
}
133138
}
134139

@@ -157,6 +162,11 @@ impl ColumnBindingBuilder {
157162
self
158163
}
159164

165+
pub fn is_srf(mut self, is_srf: bool) -> ColumnBindingBuilder {
166+
self.is_srf = is_srf;
167+
self
168+
}
169+
160170
pub fn build(self) -> ColumnBinding {
161171
ColumnBinding {
162172
database_name: self.database_name,
@@ -168,6 +178,7 @@ impl ColumnBindingBuilder {
168178
data_type: self.data_type,
169179
visibility: self.visibility,
170180
virtual_expr: self.virtual_expr,
181+
is_srf: self.is_srf,
171182
}
172183
}
173184
}

src/query/sql/src/planner/binder/select.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ impl Binder {
512512
|| stmt.qualify.is_some()
513513
|| !bind_context.aggregate_info.group_items.is_empty()
514514
|| !bind_context.aggregate_info.aggregate_functions.is_empty()
515-
|| !bind_context.srf_info.srfs.is_empty()
515+
|| bind_context.has_srf_recursive()
516516
{
517517
return Ok(());
518518
}
@@ -579,7 +579,6 @@ impl Binder {
579579
}
580580

581581
let cols = metadata.columns();
582-
583582
let virtual_cols = cols
584583
.iter()
585584
.filter(|col| matches!(col, ColumnEntry::VirtualColumn(_)))

src/query/sql/src/planner/optimizer/optimizers/operator/aggregate/normalize_aggregate.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ impl RuleNormalizeAggregateOptimizer {
151151
data_type: work_c.return_type.clone(),
152152
visibility: Visibility::Visible,
153153
column_name: work_c.display_name.clone(),
154+
is_srf: false,
154155
},
155156
}),
156157
})

src/query/storages/parquet/src/parquet_reader/reader/row_group_reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl RowGroupReader {
8080
op,
8181
&part.meta,
8282
page_locations.as_deref(),
83-
read_settings.with_enable_cache(true),
83+
*read_settings,
8484
);
8585
let mut selection = part
8686
.selectors

src/query/storages/parquet/src/source.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,11 +274,14 @@ impl Processor for ParquetSource {
274274
State::Init => {
275275
if let Some(part) = self.ctx.get_partition() {
276276
match ParquetPart::from_part(&part)? {
277+
// From Copy Table from Stage, we don't enable cache
277278
ParquetPart::RowGroup(part) => {
278279
if let Some(reader) = self
279280
.row_group_reader
280281
.create_read_policy(
281-
&ReadSettings::from_ctx(&self.ctx)?,
282+
&ReadSettings::from_ctx(&self.ctx)?.with_enable_cache(
283+
!matches!(self.source_type, ParquetSourceType::StageTable),
284+
),
282285
part,
283286
&mut self.topk_sorter,
284287
self.transformer.clone(),
@@ -343,16 +346,16 @@ impl ParquetSource {
343346
read_metadata_async_cached(path, &op, Some(part.compressed_size), &part.dedup_key)
344347
.await?;
345348

346-
if matches!(self.source_type, ParquetSourceType::StageTable) {
349+
let from_stage_table = matches!(self.source_type, ParquetSourceType::StageTable);
350+
if from_stage_table {
347351
check_parquet_schema(
348352
self.row_group_reader.schema_desc(),
349353
meta.file_metadata().schema_descr(),
350354
"first_file",
351355
part.file.as_str(),
352356
)?;
353357
}
354-
self.transformer
355-
.match_by_field_name(matches!(self.source_type, ParquetSourceType::StageTable));
358+
self.transformer.match_by_field_name(from_stage_table);
356359
// The schema of the table in iceberg may be inconsistent with the schema in parquet
357360
let reader = if self.row_group_reader.schema_desc().root_schema()
358361
!= meta.file_metadata().schema_descr().root_schema()
@@ -402,7 +405,7 @@ impl ParquetSource {
402405

403406
let reader = reader
404407
.create_read_policy(
405-
&ReadSettings::from_ctx(&self.ctx)?,
408+
&ReadSettings::from_ctx(&self.ctx)?.with_enable_cache(!from_stage_table),
406409
&part,
407410
&mut self.topk_sorter,
408411
self.transformer.clone(),

tests/sqllogictests/suites/query/lateral.test

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ select * from t, lateral(select unnest([a, a+1, a+2]) as b) t1 order by a, b
6363
3 4
6464
3 5
6565

66+
statement ok
67+
create table test_mm(payload variant, create_date Date);
68+
69+
query T
70+
select test_mm.payload payload from test_mm,
71+
lateral flatten(payload:key) walks
72+
where create_date > date_add(hour, -6, current_timestamp())
73+
and walks.value:source_person_id::varchar = 'xvcvz' limit 1;
6674

6775
statement ok
6876
CREATE TABLE user_activities(user_id int, activities variant)

0 commit comments

Comments
 (0)