Skip to content

Commit 4ade7f6

Browse files
committed
fix(cube): Make protobuf column relation serialization be quoted
1 parent cf1376a commit 4ade7f6

File tree

2 files changed

+64
-3
lines changed

2 files changed

+64
-3
lines changed

datafusion/proto-common/src/to_proto/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ impl From<Column> for protobuf::Column {
225225
fn from(c: Column) -> Self {
226226
Self {
227227
relation: c.relation.map(|relation| protobuf::ColumnRelation {
228-
relation: relation.to_string(),
228+
relation: relation.to_quoted_string(),
229229
}),
230230
name: c.name,
231231
}

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@ use datafusion::test_util::{TestTableFactory, TestTableProvider};
5353
use datafusion_common::config::TableOptions;
5454
use datafusion_common::scalar::ScalarStructBuilder;
5555
use datafusion_common::{
56-
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, DFSchemaRef,
57-
DataFusionError, Result, ScalarValue, TableReference,
56+
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, TableReference
5857
};
5958
use datafusion_expr::dml::CopyTo;
6059
use datafusion_expr::expr::{
@@ -137,6 +136,68 @@ async fn roundtrip_logical_plan() -> Result<()> {
137136
Ok(())
138137
}
139138

139+
async fn run_simple_roundtrip_with_table_name(table_name: &str) -> Result<()> {
140+
let ctx = SessionContext::new();
141+
ctx.register_csv(table_name, "tests/testdata/test.csv", CsvReadOptions::default())
142+
.await?;
143+
let scan = ctx.table(format!("`{}`", table_name)).await?
144+
.into_optimized_plan()?;
145+
let proj_scan = LogicalPlan::Projection(datafusion_expr::Projection::try_new(vec![col(&format!("`{}`.a", table_name)), col(&format!("`{}`.b", table_name))], Arc::new(scan))?);
146+
let extension_codec = TopKExtensionCodec {};
147+
let bytes = logical_plan_to_bytes_with_extension_codec(&proj_scan, &extension_codec)?;
148+
let logical_round_trip =
149+
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)?;
150+
let formatted_proj_scan = format!("{proj_scan:?}");
151+
// Sanity check that formatting includes the table name.
152+
assert!(formatted_proj_scan.contains(table_name));
153+
assert_eq!(formatted_proj_scan, format!("{logical_round_trip:?}"));
154+
Ok(())
155+
}
156+
157+
#[tokio::test]
158+
async fn roundtrip_logical_plan_noncapitalized() -> Result<()> {
159+
run_simple_roundtrip_with_table_name("table1").await
160+
}
161+
162+
#[tokio::test]
163+
async fn roundtrip_logical_plan_capitalized() -> Result<()> {
164+
run_simple_roundtrip_with_table_name("Table1").await
165+
}
166+
167+
async fn run_roundtrip_with_column_names(table_name: &str, column_name: &str) -> Result<()> {
168+
let ctx = SessionContext::new();
169+
ctx.register_csv(table_name, "tests/testdata/test.csv", CsvReadOptions::default())
170+
.await?;
171+
let scan = ctx.table(format!("`{}`", table_name)).await?
172+
.into_optimized_plan()?;
173+
println!("Constructed scan");
174+
let proj_scan = LogicalPlan::Projection(datafusion_expr::Projection::try_new(vec![col(&format!("`{}`.a", table_name)).alias(column_name), col(&format!("`{}`.b", table_name))], Arc::new(scan))?);
175+
println!("Constructed proj_scan");
176+
let proj_scan_ii = LogicalPlan::Projection(datafusion_expr::Projection::try_new(vec![col(&format!("`{}`", column_name)), col(&format!("`{}`.b", table_name))], Arc::new(proj_scan))?);
177+
println!("Constructed proj_scan_ii");
178+
let extension_codec = TopKExtensionCodec {};
179+
let bytes = logical_plan_to_bytes_with_extension_codec(&proj_scan_ii, &extension_codec)?;
180+
let logical_round_trip =
181+
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)?;
182+
let formatted_proj_scan = format!("{proj_scan_ii:?}");
183+
// Sanity check that formatting includes the table name.
184+
assert!(formatted_proj_scan.contains(table_name));
185+
assert!(formatted_proj_scan.contains(column_name));
186+
assert_eq!(formatted_proj_scan, format!("{logical_round_trip:?}"));
187+
Ok(())
188+
}
189+
190+
#[tokio::test]
191+
async fn roundtrip_logical_plan_noncapitalized_column() -> Result<()> {
192+
run_roundtrip_with_column_names("Table1", "alpha").await
193+
}
194+
195+
#[tokio::test]
196+
async fn roundtrip_logical_plan_capitalized_column() -> Result<()> {
197+
run_roundtrip_with_column_names("Table1", "Alpha").await
198+
}
199+
200+
140201
#[derive(Clone, PartialEq, Eq, ::prost::Message)]
141202
pub struct TestTableProto {
142203
/// URL of the table root

0 commit comments

Comments
 (0)