Skip to content

Commit 8d34abb

Browse files
brunalBruno Cauet
andauthored
Fix normalization of columns in JOIN ... USING. (#16560)
* Fix Column mgmt when parsing USING joins. In SqlToRel::parse_join(), when handling JoinContraint::Using, the identifiers are normalized using IdentNormalizer::normalize(). That normalization lower-cases unquoted identifiers, and keeps the case otherwise (but not the quotes). Until this commit, the normalized column names were passed to LogicalPlanBuilder::join_using() as strings. When each goes through LogicalPlanBuilder::normalize(), Column::From<String>() is called, leading to Column::from_qualified_named(). As it gets an unqualified column, it lower-cases it. This means that if a join is USING("SOME_COLUMN_NAME"), we end up with a Column { name: "some_column_name", ..}. In the end, the join fails, as that lower-case column does not exist. With this commit, SqlToRel::parse_join() calls Column::from_name() on each normalized column and passed those to LogicalPlanBuilder::join_using(). Downstream, in LogicalPlanBuilder::normalize(), there is no need to create the Column objects from strings, and the bug does not happen. This fixes #16120. * Remove genericity from LogicalPlanBuilder::join_using(). Until this commit, LogicalPlanBuilder::join_using() accepted using_keys: Vec<impl Into<Column> + Clone>. This commit removes this, only allowing Vec<Column>. Motivation: passing e.g. Vec<String> for using_keys is bug-prone, as the Strings can get (their case) modified when made into Column. That logic is admissible with a common column name that can be qualified, but some column names cannot (e.g. USING keys). This commit changes the API. However, potential users can trivially fix their code by calling Column::from/from_qualified_name on their using_keys. This forces them to things about what their identifier represent and that removes a class of potential bugs. Additional bonus: shorter compilation time & binary size. --------- Co-authored-by: Bruno Cauet <[email protected]>
1 parent 2999e41 commit 8d34abb

File tree

4 files changed

+52
-8
lines changed

4 files changed

+52
-8
lines changed

datafusion/core/tests/sql/joins.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use datafusion::assert_batches_eq;
1819
use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
1920
use datafusion::test_util::register_unbounded_file_with_ordering;
2021

@@ -235,3 +236,50 @@ async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> {
235236
}
236237
Ok(())
237238
}
239+
240+
#[tokio::test]
241+
async fn join_using_uppercase_column() -> Result<()> {
242+
let schema = Arc::new(Schema::new(vec![Field::new(
243+
"UPPER",
244+
DataType::UInt32,
245+
false,
246+
)]));
247+
let tmp_dir = TempDir::new()?;
248+
let file_path = tmp_dir.path().join("uppercase-column.csv");
249+
let mut file = File::create(file_path.clone())?;
250+
file.write_all("0".as_bytes())?;
251+
drop(file);
252+
253+
let ctx = SessionContext::new();
254+
ctx.register_csv(
255+
"test",
256+
file_path.to_str().unwrap(),
257+
CsvReadOptions::new().schema(&schema).has_header(false),
258+
)
259+
.await?;
260+
261+
let dataframe = ctx
262+
.sql(
263+
r#"
264+
SELECT test."UPPER" FROM "test"
265+
INNER JOIN (
266+
SELECT test."UPPER" FROM "test"
267+
) AS selection USING ("UPPER")
268+
;
269+
"#,
270+
)
271+
.await?;
272+
273+
assert_batches_eq!(
274+
[
275+
"+-------+",
276+
"| UPPER |",
277+
"+-------+",
278+
"| 0 |",
279+
"+-------+",
280+
],
281+
&dataframe.collect().await?
282+
);
283+
284+
Ok(())
285+
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -970,11 +970,7 @@ impl LogicalPlanBuilder {
970970
)
971971
}
972972

973-
pub(crate) fn normalize(
974-
plan: &LogicalPlan,
975-
column: impl Into<Column>,
976-
) -> Result<Column> {
977-
let column = column.into();
973+
pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
978974
if column.relation.is_some() {
979975
// column is already normalized
980976
return Ok(column);
@@ -1127,7 +1123,7 @@ impl LogicalPlanBuilder {
11271123
self,
11281124
right: LogicalPlan,
11291125
join_type: JoinType,
1130-
using_keys: Vec<impl Into<Column> + Clone>,
1126+
using_keys: Vec<Column>,
11311127
) -> Result<Self> {
11321128
let left_keys: Vec<Column> = using_keys
11331129
.clone()

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1827,7 +1827,7 @@ mod tests {
18271827
let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
18281828

18291829
let plan = LogicalPlanBuilder::from(table_scan)
1830-
.join_using(table2_scan, JoinType::Left, vec!["a"])?
1830+
.join_using(table2_scan, JoinType::Left, vec!["a".into()])?
18311831
.project(vec![col("a"), col("b")])?
18321832
.build()?;
18331833

datafusion/sql/src/relation/join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
142142
"Expected identifier in USING clause"
143143
)
144144
})
145-
.map(|ident| self.ident_normalizer.normalize(ident.clone()))
145+
.map(|ident| Column::from_name(self.ident_normalizer.normalize(ident.clone())))
146146
}
147147
})
148148
.collect::<Result<Vec<_>>>()?;

0 commit comments

Comments
 (0)