Skip to content

Commit 401d89d

Browse files
authored
fix(query): fix unable get column for join with rowfetch (#18486)
* fix(query): fix unable get column for join with rowfetch * fix(query): fix unable get column for join with rowfetch * fix(query): fix unable get column for join with rowfetch * fix(query): fix unable get column for join with rowfetch * fix(query): fix unable get column for join with rowfetch
1 parent 271ce7d commit 401d89d

File tree

18 files changed

+138
-82
lines changed

18 files changed

+138
-82
lines changed

src/query/service/tests/it/sql/planner/optimizer/ir/expr/visitor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ fn test_replace_node() {
233233
limit: Some(limit_value * 2),
234234
offset: limit.offset,
235235
before_exchange: limit.before_exchange,
236+
lazy_columns: limit.lazy_columns.clone(),
236237
};
237238
let replacement = SExpr::create_unary(
238239
Arc::new(RelOperator::Limit(new_limit)),
@@ -350,6 +351,7 @@ async fn test_async_replace_node() {
350351
limit: Some(limit_value * 2),
351352
offset: limit.offset,
352353
before_exchange: limit.before_exchange,
354+
lazy_columns: limit.lazy_columns.clone(),
353355
};
354356
let replacement = SExpr::create_unary(
355357
Arc::new(RelOperator::Limit(new_limit)),

src/query/service/tests/it/sql/planner/optimizer/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,7 @@ impl ExprBuilder {
572572
limit: Some(limit),
573573
offset,
574574
before_exchange: false,
575+
lazy_columns: Default::default(),
575576
};
576577
SExpr::create_unary(Arc::new(RelOperator::Limit(limit_op)), Arc::new(input))
577578
}

src/query/sql/src/executor/physical_plans/physical_limit.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,20 @@ impl PhysicalPlanBuilder {
5555
// 1. Prune unused Columns.
5656
// Apply lazy.
5757
let metadata = self.metadata.read().clone();
58-
let lazy_columns = metadata.lazy_columns();
59-
required = required
60-
.difference(lazy_columns)
61-
.cloned()
62-
.collect::<ColumnSet>();
63-
required.extend(metadata.row_id_indexes());
58+
59+
let support_lazy_materialize = s_expr.child(0)?.support_lazy_materialize();
60+
if !limit.lazy_columns.is_empty() && support_lazy_materialize {
61+
required = required
62+
.difference(&limit.lazy_columns)
63+
.cloned()
64+
.collect::<ColumnSet>();
65+
66+
required.extend(metadata.row_id_indexes());
67+
}
6468

6569
// 2. Build physical plan.
6670
let input_plan = self.build(s_expr.child(0)?, required).await?;
67-
let metadata = self.metadata.read().clone();
68-
if limit.before_exchange || metadata.lazy_columns().is_empty() {
71+
if limit.before_exchange || limit.lazy_columns.is_empty() || !support_lazy_materialize {
6972
return Ok(PhysicalPlan::Limit(Limit {
7073
plan_id: 0,
7174
input: Box::new(input_plan),
@@ -79,6 +82,7 @@ impl PhysicalPlanBuilder {
7982
let input_schema = input_plan.output_schema()?;
8083

8184
// Lazy materialization is enabled.
85+
let metadata = self.metadata.read();
8286
let row_id_col_index = metadata
8387
.columns()
8488
.iter()
@@ -100,8 +104,8 @@ impl PhysicalPlanBuilder {
100104
// There may be more than one `LIMIT` plan, we don't need to fetch the same columns multiple times.
101105
// See the case in tests/sqllogictests/suites/crdb/limit:
102106
// SELECT * FROM (SELECT * FROM t_47283 ORDER BY k LIMIT 4) WHERE a > 5 LIMIT 1
103-
let lazy_columns = metadata
104-
.lazy_columns()
107+
let lazy_columns = limit
108+
.lazy_columns
105109
.iter()
106110
.filter(|index| !input_schema.has_field(&index.to_string())) // If the column is already in the input schema, we don't need to fetch it.
107111
.cloned()
@@ -143,10 +147,9 @@ impl PhysicalPlanBuilder {
143147
has_inner_column,
144148
true,
145149
true,
146-
false,
147150
);
148151

149-
Ok(PhysicalPlan::RowFetch(RowFetch {
152+
let fetch = PhysicalPlan::RowFetch(RowFetch {
150153
plan_id: 0,
151154
input: Box::new(PhysicalPlan::Limit(Limit {
152155
plan_id: 0,
@@ -161,6 +164,8 @@ impl PhysicalPlanBuilder {
161164
fetched_fields,
162165
need_wrap_nullable: false,
163166
stat_info: Some(stat_info),
164-
}))
167+
});
168+
169+
Ok(fetch)
165170
}
166171
}

src/query/sql/src/executor/physical_plans/physical_mutation.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,6 @@ fn build_mutation_row_fetch(
649649
has_inner_column,
650650
true,
651651
true,
652-
false,
653652
);
654653

655654
RowFetch {

src/query/sql/src/executor/physical_plans/physical_table_scan.rs

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::btree_map::Entry;
1615
use std::collections::BTreeMap;
1716
use std::collections::HashSet;
1817
use std::sync::Arc;
@@ -217,20 +216,14 @@ impl PhysicalPlanBuilder {
217216
}
218217
}
219218

220-
if !metadata.lazy_columns().is_empty() {
221-
// Lazy materialization is enabled.
222-
if let Entry::Vacant(entry) = name_mapping.entry(ROW_ID_COL_NAME.to_string()) {
219+
if !name_mapping.contains_key(ROW_ID_COL_NAME) {
220+
let metadata = self.metadata.read();
221+
if let Some(index) = metadata.row_id_index_by_table_index(scan.table_index) {
223222
let internal_column = INTERNAL_COLUMN_FACTORY
224223
.get_internal_column(ROW_ID_COL_NAME)
225224
.unwrap();
226-
if let Some(index) = self
227-
.metadata
228-
.read()
229-
.row_id_index_by_table_index(scan.table_index)
230-
{
231-
entry.insert(index);
232-
project_internal_columns.insert(index, internal_column);
233-
}
225+
name_mapping.insert(ROW_ID_COL_NAME.to_string(), index);
226+
project_internal_columns.insert(index, internal_column);
234227
}
235228
}
236229

@@ -384,7 +377,6 @@ impl PhysicalPlanBuilder {
384377
// or else in read_partition when search internal column from table schema will core.
385378
true,
386379
true,
387-
true,
388380
);
389381
let has_virtual_column = !virtual_columns.is_empty();
390382

@@ -396,7 +388,6 @@ impl PhysicalPlanBuilder {
396388
has_inner_column,
397389
true,
398390
false,
399-
true,
400391
))
401392
} else {
402393
None
@@ -456,7 +447,6 @@ impl PhysicalPlanBuilder {
456447
has_inner_column,
457448
true,
458449
false,
459-
true,
460450
);
461451
let prewhere_columns = Self::build_projection(
462452
&metadata,
@@ -465,7 +455,6 @@ impl PhysicalPlanBuilder {
465455
has_inner_column,
466456
true,
467457
true,
468-
true,
469458
);
470459
let remain_columns = Self::build_projection(
471460
&metadata,
@@ -474,7 +463,6 @@ impl PhysicalPlanBuilder {
474463
has_inner_column,
475464
true,
476465
true,
477-
true,
478466
);
479467

480468
let predicate = prewhere
@@ -741,15 +729,11 @@ impl PhysicalPlanBuilder {
741729
has_inner_column: bool,
742730
ignore_internal_column: bool,
743731
add_virtual_source_column: bool,
744-
ignore_lazy_column: bool,
745732
) -> Projection {
746733
if !has_inner_column {
747734
let mut col_indices = Vec::new();
748735
let mut virtual_col_indices = HashSet::new();
749736
for index in columns {
750-
if ignore_lazy_column && metadata.is_lazy_column(*index) {
751-
continue;
752-
}
753737
let name = match metadata.column(*index) {
754738
ColumnEntry::BaseTableColumn(BaseTableColumn { column_name, .. }) => {
755739
column_name
@@ -787,9 +771,6 @@ impl PhysicalPlanBuilder {
787771
} else {
788772
let mut col_indices = BTreeMap::new();
789773
for index in columns {
790-
if ignore_lazy_column && metadata.is_lazy_column(*index) {
791-
continue;
792-
}
793774
let column = metadata.column(*index);
794775
match column {
795776
ColumnEntry::BaseTableColumn(BaseTableColumn {

src/query/sql/src/executor/physical_plans/physical_union_all.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,10 @@ impl PhysicalPlanBuilder {
5454
&mut self,
5555
s_expr: &SExpr,
5656
union_all: &crate::plans::UnionAll,
57-
mut required: ColumnSet,
57+
required: ColumnSet,
5858
stat_info: PlanStatsInfo,
5959
) -> Result<PhysicalPlan> {
6060
// 1. Prune unused Columns.
61-
let metadata = self.metadata.read().clone();
62-
let lazy_columns = metadata.lazy_columns();
63-
required.extend(lazy_columns);
64-
6561
// Use left's output columns as the offset indices
6662
// if the union has a CTE, the output columns are not filtered
6763
// otherwise, if the output columns of the union do not contain the columns used by the plan in the union, the expression will fail to obtain data.

src/query/sql/src/planner/binder/bind_query/bind_limit.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,14 @@ impl Binder {
3737
return s_expr;
3838
}
3939

40+
let mut metadata = self.metadata.write();
41+
let lazy_columns = metadata.lazy_columns().clone();
42+
metadata.clear_lazy_columns();
4043
let limit_plan = Limit {
4144
before_exchange: false,
4245
limit,
4346
offset,
47+
lazy_columns,
4448
};
4549
SExpr::create_unary(Arc::new(limit_plan.into()), Arc::new(s_expr))
4650
}

src/query/sql/src/planner/binder/select.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,9 @@ impl Binder {
639639

640640
// Single table, the table index is 0.
641641
let table_index = 0;
642-
if metadata.row_id_index_by_table_index(table_index).is_none() {
642+
if !metadata.lazy_columns().is_empty()
643+
&& metadata.row_id_index_by_table_index(table_index).is_none()
644+
{
643645
let internal_column = INTERNAL_COLUMN_FACTORY
644646
.get_internal_column(ROW_ID_COL_NAME)
645647
.unwrap();

src/query/sql/src/planner/dataframe.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ impl Dataframe {
336336
before_exchange: false,
337337
limit,
338338
offset,
339+
lazy_columns: Default::default(),
339340
};
340341
self.s_expr =
341342
SExpr::create_unary(Arc::new(limit_plan.into()), Arc::new(self.s_expr.clone()));

src/query/sql/src/planner/optimizer/ir/expr/s_expr.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,15 @@ impl SExpr {
211211
self.plan.has_subquery() || self.children.iter().any(|child| child.has_subquery())
212212
}
213213

214+
#[recursive::recursive]
215+
pub(crate) fn support_lazy_materialize(&self) -> bool {
216+
self.plan.support_lazy_materialize()
217+
&& self
218+
.children
219+
.iter()
220+
.all(|child| child.support_lazy_materialize())
221+
}
222+
214223
#[recursive::recursive]
215224
pub fn get_udfs(&self) -> Result<HashSet<&String>> {
216225
let mut udfs = HashSet::new();

0 commit comments

Comments
 (0)