From a1bad86132e3a15cb1cf6860fb3ce97f9da150a0 Mon Sep 17 00:00:00 2001 From: Justin O'Dwyer Date: Thu, 6 Nov 2025 16:48:38 -0500 Subject: [PATCH 1/2] Add tests for NetworkCoalesceExec. --- src/protobuf/distributed_codec.rs | 54 +++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/src/protobuf/distributed_codec.rs b/src/protobuf/distributed_codec.rs index 29a7173..e1f418c 100644 --- a/src/protobuf/distributed_codec.rs +++ b/src/protobuf/distributed_codec.rs @@ -534,4 +534,58 @@ mod tests { Ok(()) } + + #[test] + fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> { + let codec = DistributedCodec; + let registry = MemoryFunctionRegistry::new(); + + let schema = schema_i32("e"); + let flight = Arc::new(new_network_coalesce_tasks_exec( + Partitioning::UnknownPartitioning(1), + schema, + dummy_stage(), + )); + + let plan: Arc = + Arc::new(PartitionIsolatorExec::new_ready(flight.clone(), 1)?); + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let decoded = codec.try_decode(&buf, &[flight], ®istry)?; + assert_eq!(repr(&plan), repr(&decoded)); + + Ok(()) + } + + #[test] + fn test_roundtrip_isolator_union_coalesce() -> datafusion::common::Result<()> { + let codec = DistributedCodec; + let registry = MemoryFunctionRegistry::new(); + + let schema = schema_i32("c"); + let left = Arc::new(new_network_coalesce_tasks_exec( + Partitioning::RoundRobinBatch(2), + schema.clone(), + dummy_stage(), + )); + let right = Arc::new(new_network_coalesce_tasks_exec( + Partitioning::RoundRobinBatch(2), + schema.clone(), + dummy_stage(), + )); + + let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()])); + let plan: Arc = + Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 1)?); + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let decoded = codec.try_decode(&buf, &[union], ®istry)?; + assert_eq!(repr(&plan), repr(&decoded)); + + Ok(()) + } } From 7f891495c426e7a0d52dc118095e51ca22f4088c Mon Sep 17 00:00:00 2001 From: Justin O'Dwyer Date: Fri, 7 Nov 2025 11:21:34 -0500 Subject: [PATCH 2/2] Add standalone NetworkCoalesceExec test and test with n_tasks > 1. --- src/protobuf/distributed_codec.rs | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/src/protobuf/distributed_codec.rs b/src/protobuf/distributed_codec.rs index e1f418c..59025e7 100644 --- a/src/protobuf/distributed_codec.rs +++ b/src/protobuf/distributed_codec.rs @@ -536,11 +536,32 @@ mod tests { } #[test] - fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> { + fn test_roundtrip_single_flight_coalesce() -> datafusion::common::Result<()> { let codec = DistributedCodec; let registry = MemoryFunctionRegistry::new(); let schema = schema_i32("e"); + let plan: Arc = Arc::new(new_network_coalesce_tasks_exec( + Partitioning::RoundRobinBatch(3), + schema, + dummy_stage(), + )); + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let decoded = codec.try_decode(&buf, &[empty_exec()], ®istry)?; + assert_eq!(repr(&plan), repr(&decoded)); + + Ok(()) + } + + #[test] + fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> { + let codec = DistributedCodec; + let registry = MemoryFunctionRegistry::new(); + + let schema = schema_i32("f"); let flight = Arc::new(new_network_coalesce_tasks_exec( Partitioning::UnknownPartitioning(1), schema, @@ -564,7 +585,7 @@ mod tests { let codec = DistributedCodec; let registry = MemoryFunctionRegistry::new(); - let schema = schema_i32("c"); + let schema = schema_i32("g"); let left = Arc::new(new_network_coalesce_tasks_exec( Partitioning::RoundRobinBatch(2), schema.clone(), @@ -578,7 +599,7 @@ mod tests { let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()])); let plan: Arc = - Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 1)?); + Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 3)?); let mut buf = Vec::new(); codec.try_encode(plan.clone(), &mut buf)?;