Skip to content

Commit e92e15c

Browse files
authored
fix(cubesql): Support unaliased columns for SQL push down (#7199)
1 parent f3bf628 commit e92e15c

File tree

2 files changed

+35
-12
lines changed

2 files changed

+35
-12
lines changed

rust/cubesql/cubesql/src/compile/engine/df/scan.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -365,14 +365,14 @@ impl ExtensionPlanner for CubeScanExtensionPlanner {
365365
)))?;
366366

367367
let schema = SchemaRef::new(wrapper_node.schema().as_ref().into());
368-
let member_fields = schema
369-
.fields()
370-
.iter()
371-
.map(|f| MemberField::Member(f.name().to_string()))
372-
.collect();
373368
Some(Arc::new(CubeScanExecutionPlan {
374369
schema,
375-
member_fields,
370+
member_fields: wrapper_node.member_fields.as_ref().ok_or_else(|| {
371+
DataFusionError::Internal(format!(
372+
"Member fields are not set for wrapper node. Optimization wasn't performed: {:?}",
373+
wrapper_node
374+
))
375+
})?.clone(),
376376
transport: self.transport.clone(),
377377
request: wrapper_node.request.clone().unwrap_or(scan_node.request.clone()),
378378
wrapped_sql: Some(wrapper_node.wrapped_sql.as_ref().ok_or_else(|| {

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub struct CubeScanWrapperNode {
100100
pub auth_context: AuthContextRef,
101101
pub wrapped_sql: Option<SqlQuery>,
102102
pub request: Option<V1LoadRequestQuery>,
103+
pub member_fields: Option<Vec<MemberField>>,
103104
}
104105

105106
impl CubeScanWrapperNode {
@@ -114,16 +115,23 @@ impl CubeScanWrapperNode {
114115
auth_context,
115116
wrapped_sql: None,
116117
request: None,
118+
member_fields: None,
117119
}
118120
}
119121

120-
pub fn with_sql_and_request(&self, sql: SqlQuery, request: V1LoadRequestQuery) -> Self {
122+
pub fn with_sql_and_request(
123+
&self,
124+
sql: SqlQuery,
125+
request: V1LoadRequestQuery,
126+
member_fields: Vec<MemberField>,
127+
) -> Self {
121128
Self {
122129
wrapped_plan: self.wrapped_plan.clone(),
123130
meta: self.meta.clone(),
124131
auth_context: self.auth_context.clone(),
125132
wrapped_sql: Some(sql),
126133
request: Some(request),
134+
member_fields: Some(member_fields),
127135
}
128136
}
129137
}
@@ -154,15 +162,29 @@ impl CubeScanWrapperNode {
154162
transport: Arc<dyn TransportService>,
155163
load_request_meta: Arc<LoadRequestMeta>,
156164
) -> result::Result<Self, CubeError> {
157-
let (sql, request) = Self::generate_sql_for_node(
165+
let schema = self.schema();
166+
let (sql, request, member_fields) = Self::generate_sql_for_node(
158167
Arc::new(self.clone()),
159168
transport,
160169
load_request_meta,
161170
self.wrapped_plan.clone(),
162-
false,
171+
true,
163172
)
164173
.await
165-
.and_then(|SqlGenerationResult { data_source, mut sql, request, .. }| -> result::Result<_, CubeError> {
174+
.and_then(|SqlGenerationResult { data_source, mut sql, request, column_remapping, .. }| -> result::Result<_, CubeError> {
175+
let member_fields = if let Some(column_remapping) = column_remapping {
176+
schema
177+
.fields()
178+
.iter()
179+
.map(|f| MemberField::Member(column_remapping.get(&Column::from_name(f.name().to_string())).map(|x| x.name.to_string()).unwrap_or(f.name().to_string())))
180+
.collect()
181+
} else {
182+
schema
183+
.fields()
184+
.iter()
185+
.map(|f| MemberField::Member(f.name().to_string()))
186+
.collect()
187+
};
166188
let data_source = data_source.ok_or_else(|| CubeError::internal(format!(
167189
"Can't generate SQL for wrapped select: no data source returned"
168190
)))?;
@@ -178,9 +200,9 @@ impl CubeScanWrapperNode {
178200
})?
179201
.get_sql_templates();
180202
sql.finalize_query(sql_templates).map_err(|e| CubeError::internal(e.to_string()))?;
181-
Ok((sql, request))
203+
Ok((sql, request, member_fields))
182204
})?;
183-
Ok(self.with_sql_and_request(sql, request))
205+
Ok(self.with_sql_and_request(sql, request, member_fields))
184206
}
185207

186208
pub fn generate_sql_for_node(
@@ -1343,6 +1365,7 @@ impl UserDefinedLogicalNode for CubeScanWrapperNode {
13431365
auth_context: self.auth_context.clone(),
13441366
wrapped_sql: self.wrapped_sql.clone(),
13451367
request: self.request.clone(),
1368+
member_fields: self.member_fields.clone(),
13461369
})
13471370
}
13481371
}

0 commit comments

Comments
 (0)