File tree Expand file tree Collapse file tree 3 files changed +10
-5
lines changed
crates/integrations/datafusion/src/physical_plan Expand file tree Collapse file tree 3 files changed +10
-5
lines changed Original file line number Diff line number Diff line change @@ -37,6 +37,7 @@ use iceberg::spec::{DataFile, deserialize_data_file_from_json};
37
37
use iceberg:: table:: Table ;
38
38
use iceberg:: transaction:: Transaction ;
39
39
40
+ use crate :: physical_plan:: DATA_FILES_COL_NAME ;
40
41
use crate :: to_datafusion_error;
41
42
42
43
/// IcebergCommitExec is responsible for collecting results from multiple IcebergWriteExec
@@ -199,7 +200,7 @@ impl ExecutionPlan for IcebergCommitExec {
199
200
let batch = batch_result?;
200
201
201
202
let files_array = batch
202
- . column_by_name ( "data_files" )
203
+ . column_by_name ( DATA_FILES_COL_NAME )
203
204
. ok_or_else ( || {
204
205
DataFusionError :: Internal (
205
206
"Expected 'data_files' column in input batch" . to_string ( ) ,
Original file line number Diff line number Diff line change @@ -20,3 +20,5 @@ pub(crate) mod expr_to_predicate;
20
20
pub ( crate ) mod metadata_scan;
21
21
pub ( crate ) mod scan;
22
22
pub ( crate ) mod write;
23
+
24
+ pub ( crate ) const DATA_FILES_COL_NAME : & str = "data_files" ;
Original file line number Diff line number Diff line change @@ -51,6 +51,7 @@ use iceberg::{Error, ErrorKind};
51
51
use parquet:: file:: properties:: WriterProperties ;
52
52
use uuid:: Uuid ;
53
53
54
+ use crate :: physical_plan:: DATA_FILES_COL_NAME ;
54
55
use crate :: to_datafusion_error;
55
56
56
57
pub ( crate ) struct IcebergWriteExec {
@@ -88,15 +89,16 @@ impl IcebergWriteExec {
88
89
fn make_result_batch ( data_files : Vec < String > ) -> DFResult < RecordBatch > {
89
90
let files_array = Arc :: new ( StringArray :: from ( data_files) ) as ArrayRef ;
90
91
91
- RecordBatch :: try_from_iter_with_nullable ( vec ! [ ( "data_files" , files_array, false ) ] ) . map_err (
92
- |e| DataFusionError :: ArrowError ( e, Some ( "Failed to make result batch" . to_string ( ) ) ) ,
93
- )
92
+ RecordBatch :: try_from_iter_with_nullable ( vec ! [ ( DATA_FILES_COL_NAME , files_array, false ) ] )
93
+ . map_err ( |e| {
94
+ DataFusionError :: ArrowError ( e, Some ( "Failed to make result batch" . to_string ( ) ) )
95
+ } )
94
96
}
95
97
96
98
fn make_result_schema ( ) -> ArrowSchemaRef {
97
99
// Define a schema.
98
100
Arc :: new ( ArrowSchema :: new ( vec ! [ Field :: new(
99
- "data_files" ,
101
+ DATA_FILES_COL_NAME ,
100
102
DataType :: Utf8 ,
101
103
false ,
102
104
) ] ) )
You can’t perform that action at this time.
0 commit comments