@@ -31,6 +31,9 @@ use arrow::csv::WriterBuilder;
3131use arrow:: datatypes:: { Fields , TimeUnit } ;
3232use datafusion:: physical_expr:: aggregate:: AggregateExprBuilder ;
3333use datafusion:: physical_plan:: coalesce_batches:: CoalesceBatchesExec ;
34+ use datafusion:: physical_plan:: node_id:: {
35+ annotate_node_id_for_execution_plan, NodeIdAnnotator ,
36+ } ;
3437use datafusion_expr:: dml:: InsertOp ;
3538use datafusion_functions_aggregate:: approx_percentile_cont:: approx_percentile_cont_udaf;
3639use datafusion_functions_aggregate:: array_agg:: array_agg_udaf;
@@ -133,13 +136,22 @@ fn roundtrip_test_and_return(
133136 ctx : & SessionContext ,
134137 codec : & dyn PhysicalExtensionCodec ,
135138) -> Result < Arc < dyn ExecutionPlan > > {
139+ let mut annotator = NodeIdAnnotator :: new ( ) ;
140+ let exec_plan = annotate_node_id_for_execution_plan ( & exec_plan, & mut annotator) ?;
136141 let proto: protobuf:: PhysicalPlanNode =
137142 protobuf:: PhysicalPlanNode :: try_from_physical_plan ( exec_plan. clone ( ) , codec)
138143 . expect ( "to proto" ) ;
139144 let runtime = ctx. runtime_env ( ) ;
140- let result_exec_plan: Arc < dyn ExecutionPlan > = proto
145+ let mut result_exec_plan: Arc < dyn ExecutionPlan > = proto
141146 . try_into_physical_plan ( ctx, runtime. deref ( ) , codec)
142147 . expect ( "from proto" ) ;
148+
149+ // Re-annotate the deserialized plan with node IDs to match the original plan structure
150+ // This ensures that the roundtrip preserves the node_id values for comparison
151+ let mut annotator = NodeIdAnnotator :: new ( ) ;
152+ result_exec_plan =
153+ annotate_node_id_for_execution_plan ( & result_exec_plan, & mut annotator) ?;
154+
143155 assert_eq ! ( format!( "{exec_plan:?}" ) , format!( "{result_exec_plan:?}" ) ) ;
144156 Ok ( result_exec_plan)
145157}
0 commit comments