@@ -29,16 +29,20 @@ use datafusion::execution::{SendableRecordBatchStream, TaskContext};
29
29
use datafusion:: physical_expr:: { EquivalenceProperties , Partitioning } ;
30
30
use datafusion:: physical_plan:: execution_plan:: { Boundedness , EmissionType } ;
31
31
use datafusion:: physical_plan:: stream:: RecordBatchStreamAdapter ;
32
- use datafusion:: physical_plan:: { DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties } ;
32
+ use datafusion:: physical_plan:: {
33
+ DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties , execute_input_stream,
34
+ } ;
33
35
use futures:: StreamExt ;
34
- use iceberg:: spec:: { DataFile , DataFileFormat , DataFileSerde , FormatVersion } ;
36
+ use iceberg:: arrow:: schema_to_arrow_schema;
37
+ use iceberg:: spec:: { DataFileFormat , DataFileSerde , FormatVersion } ;
35
38
use iceberg:: table:: Table ;
36
39
use iceberg:: writer:: CurrentFileStatus ;
37
40
use iceberg:: writer:: file_writer:: location_generator:: {
38
41
DefaultFileNameGenerator , DefaultLocationGenerator ,
39
42
} ;
40
43
use iceberg:: writer:: file_writer:: { FileWriter , FileWriterBuilder , ParquetWriterBuilder } ;
41
44
use parquet:: file:: properties:: WriterProperties ;
45
+ use uuid:: Uuid ;
42
46
43
47
use crate :: to_datafusion_error;
44
48
@@ -159,14 +163,24 @@ impl ExecutionPlan for IcebergWriteExec {
159
163
self . table . file_io ( ) . clone ( ) ,
160
164
DefaultLocationGenerator :: new ( self . table . metadata ( ) . clone ( ) )
161
165
. map_err ( to_datafusion_error) ?,
162
- // todo actual filename
163
- DefaultFileNameGenerator :: new ( "what" . to_string ( ) , None , DataFileFormat :: Parquet ) ,
166
+ // todo filename prefix/suffix should be configurable
167
+ DefaultFileNameGenerator :: new (
168
+ "datafusion" . to_string ( ) ,
169
+ Some ( Uuid :: now_v7 ( ) . to_string ( ) ) ,
170
+ DataFileFormat :: Parquet ,
171
+ ) ,
164
172
)
165
173
. build ( ) ;
166
174
167
- // todo repartition
168
- let data = self . input . execute ( partition, context) ?;
169
- let result_schema = Arc :: clone ( & self . result_schema ) ;
175
+ let data = execute_input_stream (
176
+ Arc :: clone ( & self . input ) ,
177
+ Arc :: new (
178
+ schema_to_arrow_schema ( self . table . metadata ( ) . current_schema ( ) )
179
+ . map_err ( to_datafusion_error) ?,
180
+ ) ,
181
+ partition,
182
+ Arc :: clone ( & context) ,
183
+ ) ?;
170
184
171
185
// todo non-default partition spec?
172
186
let spec_id = self . table . metadata ( ) . default_partition_spec_id ( ) ;
@@ -175,7 +189,6 @@ impl ExecutionPlan for IcebergWriteExec {
175
189
176
190
let stream = futures:: stream:: once ( async move {
177
191
let mut writer = parquet_writer_fut. await . map_err ( to_datafusion_error) ?;
178
-
179
192
let mut input_stream = data;
180
193
181
194
while let Some ( batch_res) = input_stream. next ( ) . await {
@@ -186,28 +199,36 @@ impl ExecutionPlan for IcebergWriteExec {
186
199
let count = writer. current_row_num ( ) as u64 ;
187
200
let data_file_builders = writer. close ( ) . await . map_err ( to_datafusion_error) ?;
188
201
189
- // Convert builders to data files
190
- let data_files = data_file_builders
191
- . into_iter ( )
192
- . map ( |mut builder| builder. partition_spec_id ( spec_id) . build ( ) . unwrap ( ) )
193
- . collect :: < Vec < DataFile > > ( ) ;
194
-
195
- let data_files = data_files
202
+ // Convert builders to data files and then to JSON strings
203
+ let data_files: Vec < String > = data_file_builders
196
204
. into_iter ( )
197
- . map ( |f| {
198
- let serde = DataFileSerde :: try_from ( f, & partition_type, is_version_1) . unwrap ( ) ;
199
- let json = serde_json:: to_string ( & serde) . unwrap ( ) ;
205
+ . map ( |mut builder| -> DFResult < String > {
206
+ // Build the data file
207
+ let data_file = builder. partition_spec_id ( spec_id) . build ( ) . map_err ( |e| {
208
+ DataFusionError :: Execution ( format ! ( "Failed to build data file: {}" , e) )
209
+ } ) ?;
210
+
211
+ // Convert to DataFileSerde
212
+ let serde = DataFileSerde :: try_from ( data_file, & partition_type, is_version_1) . map_err ( |e| {
213
+ DataFusionError :: Execution ( format ! ( "Failed to convert to DataFileSerde: {}" , e) )
214
+ } ) ?;
215
+
216
+ // Serialize to JSON
217
+ let json = serde_json:: to_string ( & serde) . map_err ( |e| {
218
+ DataFusionError :: Execution ( format ! ( "Failed to serialize to JSON: {}" , e) )
219
+ } ) ?;
220
+
200
221
println ! ( "Serialized data file: {}" , json) ; // todo remove log
201
- json
222
+ Ok ( json)
202
223
} )
203
- . collect :: < Vec < String > > ( ) ;
224
+ . collect :: < DFResult < Vec < String > > > ( ) ? ;
204
225
205
- Ok ( Self :: make_result_batch ( count, data_files) ? )
226
+ Self :: make_result_batch ( count, data_files)
206
227
} )
207
228
. boxed ( ) ;
208
229
209
230
Ok ( Box :: pin ( RecordBatchStreamAdapter :: new (
210
- result_schema,
231
+ Arc :: clone ( & self . result_schema ) ,
211
232
stream,
212
233
) ) )
213
234
}
0 commit comments