2626pub  mod  configs; 
2727pub ( crate )  mod  execution; 
2828pub ( crate )  mod  generated_columns; 
29- pub  mod  lazy ; 
29+ pub ( crate )  mod  metrics ; 
3030pub ( crate )  mod  schema_evolution; 
3131
3232use  arrow_schema:: Schema ; 
3333pub  use  configs:: WriterStatsConfig ; 
3434use  datafusion:: execution:: SessionStateBuilder ; 
3535use  generated_columns:: { add_generated_columns,  add_missing_generated_columns} ; 
36+ use  metrics:: { WriteMetricExtensionPlanner ,  SOURCE_COUNT_ID ,  SOURCE_COUNT_METRIC } ; 
3637use  std:: collections:: HashMap ; 
3738use  std:: str:: FromStr ; 
3839use  std:: sync:: Arc ; 
@@ -45,7 +46,7 @@ use datafusion::datasource::MemTable;
4546use  datafusion:: execution:: context:: { SessionContext ,  SessionState } ; 
4647use  datafusion:: prelude:: DataFrame ; 
4748use  datafusion_common:: { Column ,  DFSchema ,  Result ,  ScalarValue } ; 
48- use  datafusion_expr:: { cast,  lit,  Expr ,  LogicalPlan } ; 
49+ use  datafusion_expr:: { cast,  lit,  try_cast ,   Expr ,   Extension ,  LogicalPlan } ; 
4950use  execution:: { prepare_predicate_actions,  write_execution_plan_with_predicate} ; 
5051use  futures:: future:: BoxFuture ; 
5152use  parquet:: file:: properties:: WriterProperties ; 
@@ -58,6 +59,9 @@ use super::transaction::{CommitBuilder, CommitProperties, TableReference, PROTOC
5859use  super :: { CreateBuilder ,  CustomExecuteHandler ,  Operation } ; 
5960use  crate :: delta_datafusion:: expr:: fmt_expr_to_sql; 
6061use  crate :: delta_datafusion:: expr:: parse_predicate_expression; 
62+ use  crate :: delta_datafusion:: logical:: MetricObserver ; 
63+ use  crate :: delta_datafusion:: physical:: { find_metric_node,  get_metric} ; 
64+ use  crate :: delta_datafusion:: planner:: DeltaPlanner ; 
6165use  crate :: delta_datafusion:: register_store; 
6266use  crate :: delta_datafusion:: DataFusionMixins ; 
6367use  crate :: errors:: { DeltaResult ,  DeltaTableError } ; 
@@ -418,16 +422,25 @@ impl std::future::IntoFuture for WriteBuilder {
418422            let  mut  metrics = WriteMetrics :: default ( ) ; 
419423            let  exec_start = Instant :: now ( ) ; 
420424
425+             let  write_planner = DeltaPlanner :: < WriteMetricExtensionPlanner >  { 
426+                 extension_planner :  WriteMetricExtensionPlanner  { } , 
427+             } ; 
428+ 
421429            // Create table actions to initialize table in case it does not yet exist 
422430            // and should be created 
423431            let  mut  actions = this. check_preconditions ( ) . await ?; 
424432
425433            let  partition_columns = this. get_partition_columns ( ) ?; 
426434
427435            let  state = match  this. state  { 
428-                 Some ( state)  => state, 
436+                 Some ( state)  => SessionStateBuilder :: new_from_existing ( state. clone ( ) ) 
437+                     . with_query_planner ( Arc :: new ( write_planner) ) 
438+                     . build ( ) , 
429439                None  => { 
430-                     let  state = SessionStateBuilder :: new ( ) . with_default_features ( ) . build ( ) ; 
440+                     let  state = SessionStateBuilder :: new ( ) 
441+                         . with_default_features ( ) 
442+                         . with_query_planner ( Arc :: new ( write_planner) ) 
443+                         . build ( ) ; 
431444                    register_store ( this. log_store . clone ( ) ,  state. runtime_env ( ) . clone ( ) ) ; 
432445                    state
433446                } 
@@ -491,7 +504,8 @@ impl std::future::IntoFuture for WriteBuilder {
491504                for  field in  new_schema. fields ( )  { 
492505                    // If field exist in source data, we cast to new datatype 
493506                    if  source_schema. index_of ( field. name ( ) ) . is_ok ( )  { 
494-                         let  cast_expr = cast ( 
507+                         let  cast_fn = if  this. safe_cast  {  try_cast }  else  {  cast } ; 
508+                         let  cast_expr = cast_fn ( 
495509                            Expr :: Column ( Column :: from_name ( field. name ( ) ) ) , 
496510                            // col(field.name()), 
497511                            field. data_type ( ) . clone ( ) , 
@@ -520,6 +534,16 @@ impl std::future::IntoFuture for WriteBuilder {
520534                & state, 
521535            ) ?; 
522536
537+             let  source = LogicalPlan :: Extension ( Extension  { 
538+                 node :  Arc :: new ( MetricObserver  { 
539+                     id :  "write_source_count" . into ( ) , 
540+                     input :  source. logical_plan ( ) . clone ( ) , 
541+                     enable_pushdown :  false , 
542+                 } ) , 
543+             } ) ; 
544+ 
545+             let  source = DataFrame :: new ( state. clone ( ) ,  source) ; 
546+ 
523547            let  schema = Arc :: new ( source. schema ( ) . as_arrow ( ) . clone ( ) ) ; 
524548
525549            // Maybe create schema action 
@@ -576,21 +600,31 @@ impl std::future::IntoFuture for WriteBuilder {
576600                stats_columns, 
577601            } ; 
578602
603+             let  source_plan = source. clone ( ) . create_physical_plan ( ) . await ?; 
604+ 
579605            // Here we need to validate if the new data conforms to a predicate if one is provided 
580606            let  add_actions = write_execution_plan_with_predicate ( 
581607                predicate. clone ( ) , 
582608                this. snapshot . as_ref ( ) , 
583609                state. clone ( ) , 
584-                 source . clone ( ) . create_physical_plan ( ) . await ? , 
610+                 source_plan . clone ( ) , 
585611                partition_columns. clone ( ) , 
586612                this. log_store . object_store ( Some ( operation_id) ) . clone ( ) , 
587613                target_file_size, 
588614                this. write_batch_size , 
589615                this. writer_properties . clone ( ) , 
590616                writer_stats_config. clone ( ) , 
591-                 None , 
592617            ) 
593618            . await ?; 
619+ 
620+             let  source_count =
621+                 find_metric_node ( SOURCE_COUNT_ID ,  & source_plan) . ok_or_else ( || { 
622+                     DeltaTableError :: Generic ( "Unable to locate expected metric node" . into ( ) ) 
623+                 } ) ?; 
624+             let  source_count_metrics = source_count. metrics ( ) . unwrap ( ) ; 
625+             let  num_added_rows = get_metric ( & source_count_metrics,  SOURCE_COUNT_METRIC ) ; 
626+             metrics. num_added_rows  = num_added_rows; 
627+ 
594628            metrics. num_added_files  = add_actions. len ( ) ; 
595629            actions. extend ( add_actions) ; 
596630
@@ -989,7 +1023,6 @@ mod tests {
9891023        assert_eq ! ( table. version( ) ,  0 ) ; 
9901024        assert_eq ! ( table. get_files_count( ) ,  2 ) ; 
9911025        let  write_metrics:  WriteMetrics  = get_write_metrics ( table. clone ( ) ) . await ; 
992-         assert ! ( write_metrics. num_partitions > 0 ) ; 
9931026        assert_eq ! ( write_metrics. num_added_files,  2 ) ; 
9941027        assert_common_write_metrics ( write_metrics) ; 
9951028
@@ -1003,7 +1036,6 @@ mod tests {
10031036        assert_eq ! ( table. get_files_count( ) ,  4 ) ; 
10041037
10051038        let  write_metrics:  WriteMetrics  = get_write_metrics ( table. clone ( ) ) . await ; 
1006-         assert ! ( write_metrics. num_partitions > 0 ) ; 
10071039        assert_eq ! ( write_metrics. num_added_files,  4 ) ; 
10081040        assert_common_write_metrics ( write_metrics) ; 
10091041    } 
@@ -1093,7 +1125,6 @@ mod tests {
10931125        assert_eq ! ( table. version( ) ,  0 ) ; 
10941126
10951127        let  write_metrics:  WriteMetrics  = get_write_metrics ( table. clone ( ) ) . await ; 
1096-         assert ! ( write_metrics. num_partitions > 0 ) ; 
10971128        assert_common_write_metrics ( write_metrics) ; 
10981129
10991130        let  mut  new_schema_builder = arrow_schema:: SchemaBuilder :: new ( ) ; 
@@ -1146,7 +1177,6 @@ mod tests {
11461177        assert_eq ! ( part_cols,  vec![ "id" ,  "value" ] ) ;  // we want to preserve partitions 
11471178
11481179        let  write_metrics:  WriteMetrics  = get_write_metrics ( table. clone ( ) ) . await ; 
1149-         assert ! ( write_metrics. num_partitions > 0 ) ; 
11501180        assert_common_write_metrics ( write_metrics) ; 
11511181    } 
11521182
@@ -1668,7 +1698,6 @@ mod tests {
16681698        assert_eq ! ( table. version( ) ,  1 ) ; 
16691699        let  write_metrics:  WriteMetrics  = get_write_metrics ( table. clone ( ) ) . await ; 
16701700        assert_eq ! ( write_metrics. num_added_rows,  3 ) ; 
1671-         assert ! ( write_metrics. num_partitions > 0 ) ; 
16721701        assert_common_write_metrics ( write_metrics) ; 
16731702
16741703        let  table = DeltaOps ( table) 
@@ -1680,7 +1709,6 @@ mod tests {
16801709        assert_eq ! ( table. version( ) ,  2 ) ; 
16811710        let  write_metrics:  WriteMetrics  = get_write_metrics ( table. clone ( ) ) . await ; 
16821711        assert_eq ! ( write_metrics. num_added_rows,  1 ) ; 
1683-         assert ! ( write_metrics. num_partitions > 0 ) ; 
16841712        assert ! ( write_metrics. num_removed_files > 0 ) ; 
16851713        assert_common_write_metrics ( write_metrics) ; 
16861714
0 commit comments