@@ -4,9 +4,6 @@ use deltalake::arrow::error::ArrowError;
4
4
use deltalake:: arrow:: record_batch:: RecordBatch ;
5
5
use etl:: types:: { Cell , TableRow , TableSchema } ;
6
6
use std:: sync:: Arc ;
7
-
8
- use crate :: delta:: schema:: postgres_to_delta_schema;
9
-
10
7
/// Converts TableRows to Arrow RecordBatch for Delta Lake writes
11
8
pub struct TableRowEncoder ;
12
9
@@ -29,17 +26,27 @@ impl TableRowEncoder {
29
26
table_schema : & TableSchema ,
30
27
table_rows : Vec < TableRow > ,
31
28
) -> Result < RecordBatch , ArrowError > {
32
- // Create Arrow schema from TableSchema
33
- let delta_schema = postgres_to_delta_schema ( table_schema)
34
- . map_err ( |e| ArrowError :: ExternalError ( Box :: new ( e) ) ) ?;
29
+ let arrays = Self :: convert_columns_to_arrays ( table_schema, & table_rows) ?;
35
30
36
- // Convert Delta schema to Arrow schema
37
- let arrow_schema = Self :: delta_schema_to_arrow ( & delta_schema) ?;
31
+ // Create Arrow schema that MATCHES the actual array types we generated
32
+ let fields: Vec < Field > = table_schema
33
+ . column_schemas
34
+ . iter ( )
35
+ . zip ( arrays. iter ( ) )
36
+ . map ( |( col_schema, array) | {
37
+ Field :: new (
38
+ & col_schema. name ,
39
+ array. data_type ( ) . clone ( ) ,
40
+ col_schema. nullable ,
41
+ )
42
+ } )
43
+ . collect ( ) ;
38
44
39
- // Convert each column's data to Arrow arrays
40
- let arrays = Self :: convert_columns_to_arrays ( table_schema, & table_rows) ?;
45
+ let arrow_schema = Schema :: new ( fields) ;
41
46
42
- RecordBatch :: try_new ( Arc :: new ( arrow_schema) , arrays)
47
+ let result = RecordBatch :: try_new ( Arc :: new ( arrow_schema) , arrays) ;
48
+
49
+ result
43
50
}
44
51
45
52
/// Convert Delta schema to Arrow schema
@@ -100,38 +107,80 @@ impl TableRowEncoder {
100
107
Ok ( arrays)
101
108
}
102
109
103
- /// Convert a column of Cells to an Arrow array
110
+ /// Convert a column of Cells to an Arrow array based on the first non-null value's type
104
111
fn convert_cell_column_to_array ( cells : Vec < & Cell > ) -> Result < ArrayRef , ArrowError > {
105
- // todo(abhi): Implement proper type detection and conversion
106
- // todo(abhi): Handle all Cell variants: Null, Bool, String, I16, I32, U32, I64, F32, F64,
107
- // Numeric, Date, Time, Timestamp, TimestampTz, Uuid, Json, Bytes, Array
108
-
109
- // For now, convert everything to string as a stub
110
- let string_values: Vec < Option < String > > = cells
111
- . iter ( )
112
- . map ( |cell| match cell {
113
- Cell :: Null => None ,
114
- Cell :: Bool ( b) => Some ( b. to_string ( ) ) ,
115
- Cell :: String ( s) => Some ( s. clone ( ) ) ,
116
- Cell :: I16 ( i) => Some ( i. to_string ( ) ) ,
117
- Cell :: I32 ( i) => Some ( i. to_string ( ) ) ,
118
- Cell :: U32 ( i) => Some ( i. to_string ( ) ) ,
119
- Cell :: I64 ( i) => Some ( i. to_string ( ) ) ,
120
- Cell :: F32 ( f) => Some ( f. to_string ( ) ) ,
121
- Cell :: F64 ( f) => Some ( f. to_string ( ) ) ,
122
- Cell :: Numeric ( n) => Some ( n. to_string ( ) ) ,
123
- Cell :: Date ( d) => Some ( d. to_string ( ) ) ,
124
- Cell :: Time ( t) => Some ( t. to_string ( ) ) ,
125
- Cell :: Timestamp ( ts) => Some ( ts. to_string ( ) ) ,
126
- Cell :: TimestampTz ( ts) => Some ( ts. to_string ( ) ) ,
127
- Cell :: Uuid ( u) => Some ( u. to_string ( ) ) ,
128
- Cell :: Json ( j) => Some ( j. to_string ( ) ) ,
129
- Cell :: Bytes ( b) => Some ( format ! ( "{b:?}" ) ) ,
130
- Cell :: Array ( a) => Some ( format ! ( "{a:?}" ) ) ,
131
- } )
132
- . collect ( ) ;
112
+ if cells. is_empty ( ) {
113
+ return Ok ( Arc :: new ( StringArray :: from ( Vec :: < Option < String > > :: new ( ) ) ) ) ;
114
+ }
133
115
134
- Ok ( Arc :: new ( StringArray :: from ( string_values) ) )
116
+ // Determine the column type from the first non-null cell
117
+ let first_non_null = cells. iter ( ) . find ( |cell| !matches ! ( cell, Cell :: Null ) ) ;
118
+
119
+ match first_non_null {
120
+ Some ( Cell :: Bool ( _) ) => {
121
+ let bool_values: Vec < Option < bool > > = cells
122
+ . iter ( )
123
+ . map ( |cell| match cell {
124
+ Cell :: Null => None ,
125
+ Cell :: Bool ( b) => Some ( * b) ,
126
+ _ => None , // Invalid conversion, treat as null
127
+ } )
128
+ . collect ( ) ;
129
+ Ok ( Arc :: new ( BooleanArray :: from ( bool_values) ) )
130
+ }
131
+ Some ( Cell :: I32 ( _) ) => {
132
+ let int_values: Vec < Option < i32 > > = cells
133
+ . iter ( )
134
+ . map ( |cell| match cell {
135
+ Cell :: Null => None ,
136
+ Cell :: I32 ( i) => Some ( * i) ,
137
+ Cell :: I16 ( i) => Some ( * i as i32 ) ,
138
+ Cell :: U32 ( i) => Some ( * i as i32 ) ,
139
+ _ => None ,
140
+ } )
141
+ . collect ( ) ;
142
+ Ok ( Arc :: new ( Int32Array :: from ( int_values) ) )
143
+ }
144
+ Some ( Cell :: I16 ( _) ) => {
145
+ let int_values: Vec < Option < i32 > > = cells
146
+ . iter ( )
147
+ . map ( |cell| match cell {
148
+ Cell :: Null => None ,
149
+ Cell :: I16 ( i) => Some ( * i as i32 ) ,
150
+ Cell :: I32 ( i) => Some ( * i) ,
151
+ _ => None ,
152
+ } )
153
+ . collect ( ) ;
154
+ Ok ( Arc :: new ( Int32Array :: from ( int_values) ) )
155
+ }
156
+ _ => {
157
+ // For all other types (String, Numeric, etc.), convert to string
158
+ let string_values: Vec < Option < String > > = cells
159
+ . iter ( )
160
+ . map ( |cell| match cell {
161
+ Cell :: Null => None ,
162
+ Cell :: Bool ( b) => Some ( b. to_string ( ) ) ,
163
+ Cell :: String ( s) => Some ( s. clone ( ) ) ,
164
+ Cell :: I16 ( i) => Some ( i. to_string ( ) ) ,
165
+ Cell :: I32 ( i) => Some ( i. to_string ( ) ) ,
166
+ Cell :: U32 ( i) => Some ( i. to_string ( ) ) ,
167
+ Cell :: I64 ( i) => Some ( i. to_string ( ) ) ,
168
+ Cell :: F32 ( f) => Some ( f. to_string ( ) ) ,
169
+ Cell :: F64 ( f) => Some ( f. to_string ( ) ) ,
170
+ Cell :: Numeric ( n) => Some ( n. to_string ( ) ) ,
171
+ Cell :: Date ( d) => Some ( d. to_string ( ) ) ,
172
+ Cell :: Time ( t) => Some ( t. to_string ( ) ) ,
173
+ Cell :: Timestamp ( ts) => Some ( ts. to_string ( ) ) ,
174
+ Cell :: TimestampTz ( ts) => Some ( ts. to_string ( ) ) ,
175
+ Cell :: Uuid ( u) => Some ( u. to_string ( ) ) ,
176
+ Cell :: Json ( j) => Some ( j. to_string ( ) ) ,
177
+ Cell :: Bytes ( b) => Some ( format ! ( "{b:?}" ) ) ,
178
+ Cell :: Array ( a) => Some ( format ! ( "{a:?}" ) ) ,
179
+ } )
180
+ . collect ( ) ;
181
+ Ok ( Arc :: new ( StringArray :: from ( string_values) ) )
182
+ }
183
+ }
135
184
}
136
185
137
186
/// Convert Cell values to specific Arrow array types
0 commit comments