1
1
use deltalake:: DeltaTableError ;
2
2
use deltalake:: datafusion:: common:: Column ;
3
- use deltalake:: datafusion:: prelude:: { SessionContext , col } ;
3
+ use deltalake:: datafusion:: prelude:: SessionContext ;
4
4
use deltalake:: operations:: merge:: MergeBuilder ;
5
5
use deltalake:: { DeltaResult , DeltaTable , datafusion:: prelude:: Expr } ;
6
6
use etl:: types:: { TableRow as PgTableRow , TableSchema as PgTableSchema } ;
@@ -10,6 +10,10 @@ use crate::deltalake::config::DeltaTableConfig;
10
10
use crate :: deltalake:: expr:: qualify_primary_keys;
11
11
use crate :: deltalake:: schema:: postgres_to_arrow_schema;
12
12
13
+ pub ( crate ) fn source_qualified_column_expr ( column_name : & str , source_alias : & str ) -> Expr {
14
+ Expr :: Column ( Column :: new ( Some ( source_alias) , column_name) )
15
+ }
16
+
13
17
pub async fn merge_to_table (
14
18
table : & mut DeltaTable ,
15
19
config : & DeltaTableConfig ,
@@ -55,12 +59,18 @@ pub async fn merge_to_table(
55
59
. with_target_alias ( "target" )
56
60
. when_not_matched_insert ( |insert| {
57
61
all_columns. iter ( ) . fold ( insert, |insert, & column| {
58
- insert. set ( column. to_string ( ) , col ( format ! ( "source.{column}" ) ) )
62
+ insert. set (
63
+ column. to_string ( ) ,
64
+ source_qualified_column_expr ( column, "source" ) ,
65
+ )
59
66
} )
60
67
} ) ?
61
68
. when_matched_update ( |update| {
62
69
all_columns. iter ( ) . fold ( update, |update, & column| {
63
- update. update ( column. to_string ( ) , col ( format ! ( "source.{column}" ) ) )
70
+ update. update (
71
+ column. to_string ( ) ,
72
+ source_qualified_column_expr ( column, "source" ) ,
73
+ )
64
74
} )
65
75
} ) ?;
66
76
@@ -73,3 +83,45 @@ pub async fn merge_to_table(
73
83
* table = merged_table;
74
84
Ok ( ( ) )
75
85
}
86
+
87
+ #[ cfg( test) ]
88
+ mod tests {
89
+ use super :: * ;
90
+ use insta:: assert_debug_snapshot;
91
+
92
+ #[ test]
93
+ fn source_qualified_column_expr_preserves_case_and_alias ( ) {
94
+ let expr = source_qualified_column_expr ( "CASESensitivecolumn" , "source" ) ;
95
+
96
+ assert_debug_snapshot ! ( expr, @r#"
97
+ Column(
98
+ Column {
99
+ relation: Some(
100
+ Bare {
101
+ table: "source",
102
+ },
103
+ ),
104
+ name: "CASESensitivecolumn",
105
+ },
106
+ )
107
+ "# ) ;
108
+ }
109
+
110
+ #[ test]
111
+ fn source_qualified_column_expr_handles_lowercase ( ) {
112
+ let expr = source_qualified_column_expr ( "lowercasecolumn" , "source" ) ;
113
+
114
+ assert_debug_snapshot ! ( expr, @r#"
115
+ Column(
116
+ Column {
117
+ relation: Some(
118
+ Bare {
119
+ table: "source",
120
+ },
121
+ ),
122
+ name: "lowercasecolumn",
123
+ },
124
+ )
125
+ "# ) ;
126
+ }
127
+ }
0 commit comments