@@ -29,8 +29,11 @@ use datafusion::execution::{SendableRecordBatchStream, TaskContext};
29
29
use datafusion:: physical_expr:: { EquivalenceProperties , Partitioning } ;
30
30
use datafusion:: physical_plan:: execution_plan:: { Boundedness , EmissionType } ;
31
31
use datafusion:: physical_plan:: stream:: RecordBatchStreamAdapter ;
32
- use datafusion:: physical_plan:: { DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties } ;
32
+ use datafusion:: physical_plan:: {
33
+ DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties , execute_input_stream,
34
+ } ;
33
35
use futures:: StreamExt ;
36
+ use iceberg:: arrow:: schema_to_arrow_schema;
34
37
use iceberg:: spec:: { DataFile , DataFileFormat , DataFileSerde , FormatVersion } ;
35
38
use iceberg:: table:: Table ;
36
39
use iceberg:: writer:: CurrentFileStatus ;
@@ -39,6 +42,7 @@ use iceberg::writer::file_writer::location_generator::{
39
42
} ;
40
43
use iceberg:: writer:: file_writer:: { FileWriter , FileWriterBuilder , ParquetWriterBuilder } ;
41
44
use parquet:: file:: properties:: WriterProperties ;
45
+ use uuid:: Uuid ;
42
46
43
47
use crate :: to_datafusion_error;
44
48
@@ -159,14 +163,24 @@ impl ExecutionPlan for IcebergWriteExec {
159
163
self . table . file_io ( ) . clone ( ) ,
160
164
DefaultLocationGenerator :: new ( self . table . metadata ( ) . clone ( ) )
161
165
. map_err ( to_datafusion_error) ?,
162
- // todo actual filename
163
- DefaultFileNameGenerator :: new ( "what" . to_string ( ) , None , DataFileFormat :: Parquet ) ,
166
+ // todo filename prefix/suffix should be configurable
167
+ DefaultFileNameGenerator :: new (
168
+ "datafusion" . to_string ( ) ,
169
+ Some ( Uuid :: now_v7 ( ) . to_string ( ) ) ,
170
+ DataFileFormat :: Parquet ,
171
+ ) ,
164
172
)
165
173
. build ( ) ;
166
174
167
- // todo repartition
168
- let data = self . input . execute ( partition, context) ?;
169
- let result_schema = Arc :: clone ( & self . result_schema ) ;
175
+ let data = execute_input_stream (
176
+ Arc :: clone ( & self . input ) ,
177
+ Arc :: new (
178
+ schema_to_arrow_schema ( self . table . metadata ( ) . current_schema ( ) )
179
+ . map_err ( to_datafusion_error) ?,
180
+ ) ,
181
+ partition,
182
+ Arc :: clone ( & context) ,
183
+ ) ?;
170
184
171
185
// todo non-default partition spec?
172
186
let spec_id = self . table . metadata ( ) . default_partition_spec_id ( ) ;
@@ -175,7 +189,6 @@ impl ExecutionPlan for IcebergWriteExec {
175
189
176
190
let stream = futures:: stream:: once ( async move {
177
191
let mut writer = parquet_writer_fut. await . map_err ( to_datafusion_error) ?;
178
-
179
192
let mut input_stream = data;
180
193
181
194
while let Some ( batch_res) = input_stream. next ( ) . await {
@@ -187,27 +200,45 @@ impl ExecutionPlan for IcebergWriteExec {
187
200
let data_file_builders = writer. close ( ) . await . map_err ( to_datafusion_error) ?;
188
201
189
202
// Convert builders to data files
190
- let data_files = data_file_builders
203
+ let data_files: DFResult < Vec < DataFile > > = data_file_builders
191
204
. into_iter ( )
192
- . map ( |mut builder| builder. partition_spec_id ( spec_id) . build ( ) . unwrap ( ) )
193
- . collect :: < Vec < DataFile > > ( ) ;
205
+ . map ( |mut builder| {
206
+ builder. partition_spec_id ( spec_id) . build ( ) . map_err ( |e| {
207
+ DataFusionError :: Execution ( format ! ( "Failed to build data file: {}" , e) )
208
+ } )
209
+ } )
210
+ . collect ( ) ;
211
+ let data_files = data_files?;
194
212
195
- let data_files = data_files
213
+ let data_files: DFResult < Vec < String > > = data_files
196
214
. into_iter ( )
197
215
. map ( |f| {
198
- let serde = DataFileSerde :: try_from ( f, & partition_type, is_version_1) . unwrap ( ) ;
199
- let json = serde_json:: to_string ( & serde) . unwrap ( ) ;
216
+ // Convert to DataFileSerde
217
+ let serde =
218
+ DataFileSerde :: try_from ( f, & partition_type, is_version_1) . map_err ( |e| {
219
+ DataFusionError :: Execution ( format ! (
220
+ "Failed to convert to DataFileSerde: {}" ,
221
+ e
222
+ ) )
223
+ } ) ?;
224
+
225
+ // Serialize to JSON
226
+ let json = serde_json:: to_string ( & serde) . map_err ( |e| {
227
+ DataFusionError :: Execution ( format ! ( "Failed to serialize to JSON: {}" , e) )
228
+ } ) ?;
229
+
200
230
println ! ( "Serialized data file: {}" , json) ; // todo remove log
201
- json
231
+ Ok ( json)
202
232
} )
203
- . collect :: < Vec < String > > ( ) ;
233
+ . collect ( ) ;
234
+ let data_files = data_files?;
204
235
205
236
Ok ( Self :: make_result_batch ( count, data_files) ?)
206
237
} )
207
238
. boxed ( ) ;
208
239
209
240
Ok ( Box :: pin ( RecordBatchStreamAdapter :: new (
210
- result_schema,
241
+ Arc :: clone ( & self . result_schema ) ,
211
242
stream,
212
243
) ) )
213
244
}
0 commit comments