17
17
18
18
use std:: any:: Any ;
19
19
use std:: fmt:: { Debug , Formatter } ;
20
+ use std:: str:: FromStr ;
20
21
use std:: sync:: Arc ;
21
22
22
23
use datafusion:: arrow:: array:: { ArrayRef , RecordBatch , StringArray , UInt64Array } ;
@@ -34,13 +35,18 @@ use datafusion::physical_plan::{
34
35
} ;
35
36
use futures:: StreamExt ;
36
37
use iceberg:: arrow:: schema_to_arrow_schema;
37
- use iceberg:: spec:: { DataFileFormat , FormatVersion , serialize_data_file_to_json} ;
38
+ use iceberg:: spec:: {
39
+ DataFileFormat , FormatVersion , PROPERTY_DEFAULT_FILE_FORMAT ,
40
+ PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT , serialize_data_file_to_json,
41
+ } ;
38
42
use iceberg:: table:: Table ;
39
- use iceberg:: writer:: CurrentFileStatus ;
43
+ use iceberg:: writer:: base_writer :: data_file_writer :: DataFileWriterBuilder ;
40
44
use iceberg:: writer:: file_writer:: location_generator:: {
41
45
DefaultFileNameGenerator , DefaultLocationGenerator ,
42
46
} ;
43
- use iceberg:: writer:: file_writer:: { FileWriter , FileWriterBuilder , ParquetWriterBuilder } ;
47
+ use iceberg:: writer:: file_writer:: ParquetWriterBuilder ;
48
+ use iceberg:: writer:: { CurrentFileStatus , IcebergWriter , IcebergWriterBuilder } ;
49
+ use iceberg:: { Error , ErrorKind } ;
44
50
use parquet:: file:: properties:: WriterProperties ;
45
51
use uuid:: Uuid ;
46
52
@@ -157,21 +163,50 @@ impl ExecutionPlan for IcebergWriteExec {
157
163
partition : usize ,
158
164
context : Arc < TaskContext > ,
159
165
) -> DFResult < SendableRecordBatchStream > {
160
- let parquet_writer_fut = ParquetWriterBuilder :: new (
161
- WriterProperties :: default ( ) ,
162
- self . table . metadata ( ) . current_schema ( ) . clone ( ) ,
163
- self . table . file_io ( ) . clone ( ) ,
164
- DefaultLocationGenerator :: new ( self . table . metadata ( ) . clone ( ) )
165
- . map_err ( to_datafusion_error ) ? ,
166
- // todo filename prefix/suffix should be configurable
167
- DefaultFileNameGenerator :: new (
168
- "datafusion" . to_string ( ) ,
169
- Some ( Uuid :: now_v7 ( ) . to_string ( ) ) ,
170
- DataFileFormat :: Parquet ,
171
- ) ,
166
+ // todo non-default partition spec?
167
+ let spec_id = self . table . metadata ( ) . default_partition_spec_id ( ) ;
168
+ let partition_type = self . table . metadata ( ) . default_partition_type ( ) . clone ( ) ;
169
+ let is_version_1 = self . table . metadata ( ) . format_version ( ) == FormatVersion :: V1 ;
170
+
171
+ // Check data file format
172
+ let file_format = DataFileFormat :: from_str (
173
+ self . table
174
+ . metadata ( )
175
+ . properties ( )
176
+ . get ( PROPERTY_DEFAULT_FILE_FORMAT )
177
+ . unwrap_or ( & PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT . to_string ( ) ) ,
172
178
)
173
- . build ( ) ;
179
+ . map_err ( to_datafusion_error) ?;
180
+ if file_format != DataFileFormat :: Parquet {
181
+ return Err ( to_datafusion_error ( Error :: new (
182
+ ErrorKind :: FeatureUnsupported ,
183
+ format ! (
184
+ "File format {} is not supported for insert_into yet!" ,
185
+ file_format
186
+ ) ,
187
+ ) ) ) ;
188
+ }
174
189
190
+ // Create data file writer builder
191
+ let data_file_writer_builder = DataFileWriterBuilder :: new (
192
+ ParquetWriterBuilder :: new (
193
+ WriterProperties :: default ( ) ,
194
+ self . table . metadata ( ) . current_schema ( ) . clone ( ) ,
195
+ self . table . file_io ( ) . clone ( ) ,
196
+ DefaultLocationGenerator :: new ( self . table . metadata ( ) . clone ( ) )
197
+ . map_err ( to_datafusion_error) ?,
198
+ // todo filename prefix/suffix should be configurable
199
+ DefaultFileNameGenerator :: new (
200
+ "datafusion" . to_string ( ) ,
201
+ Some ( Uuid :: now_v7 ( ) . to_string ( ) ) ,
202
+ file_format,
203
+ ) ,
204
+ ) ,
205
+ None ,
206
+ spec_id,
207
+ ) ;
208
+
209
+ // Get input data
175
210
let data = execute_input_stream (
176
211
Arc :: clone ( & self . input ) ,
177
212
Arc :: new (
@@ -182,18 +217,16 @@ impl ExecutionPlan for IcebergWriteExec {
182
217
Arc :: clone ( & context) ,
183
218
) ?;
184
219
185
- // todo non-default partition spec?
186
- let spec_id = self . table . metadata ( ) . default_partition_spec_id ( ) ;
187
- let partition_type = self . table . metadata ( ) . default_partition_type ( ) . clone ( ) ;
188
- let is_version_1 = self . table . metadata ( ) . format_version ( ) == FormatVersion :: V1 ;
189
-
220
+ // Create write stream
190
221
let stream = futures:: stream:: once ( async move {
191
- let mut writer = parquet_writer_fut. await . map_err ( to_datafusion_error) ?;
222
+ let mut writer = data_file_writer_builder
223
+ . build ( )
224
+ . await
225
+ . map_err ( to_datafusion_error) ?;
192
226
let mut input_stream = data;
193
227
194
- while let Some ( batch_res) = input_stream. next ( ) . await {
195
- let batch = batch_res?;
196
- writer. write ( & batch) . await . map_err ( to_datafusion_error) ?;
228
+ while let Some ( batch) = input_stream. next ( ) . await {
229
+ writer. write ( batch?) . await . map_err ( to_datafusion_error) ?;
197
230
}
198
231
199
232
let count = writer. current_row_num ( ) as u64 ;
@@ -202,12 +235,7 @@ impl ExecutionPlan for IcebergWriteExec {
202
235
// Convert builders to data files and then to JSON strings
203
236
let data_files: Vec < String > = data_file_builders
204
237
. into_iter ( )
205
- . map ( |mut builder| -> DFResult < String > {
206
- // Build the data file
207
- let data_file = builder. partition_spec_id ( spec_id) . build ( ) . map_err ( |e| {
208
- DataFusionError :: Execution ( format ! ( "Failed to build data file: {}" , e) )
209
- } ) ?;
210
-
238
+ . map ( |data_file| -> DFResult < String > {
211
239
// Serialize to JSON
212
240
let json =
213
241
serialize_data_file_to_json ( data_file, & partition_type, is_version_1)
0 commit comments