diff --git a/src/protobuf/distributed_codec.rs b/src/protobuf/distributed_codec.rs index 29a7173..59025e7 100644 --- a/src/protobuf/distributed_codec.rs +++ b/src/protobuf/distributed_codec.rs @@ -534,4 +534,79 @@ mod tests { Ok(()) } + + #[test] + fn test_roundtrip_single_flight_coalesce() -> datafusion::common::Result<()> { + let codec = DistributedCodec; + let registry = MemoryFunctionRegistry::new(); + + let schema = schema_i32("e"); + let plan: Arc = Arc::new(new_network_coalesce_tasks_exec( + Partitioning::RoundRobinBatch(3), + schema, + dummy_stage(), + )); + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let decoded = codec.try_decode(&buf, &[empty_exec()], ®istry)?; + assert_eq!(repr(&plan), repr(&decoded)); + + Ok(()) + } + + #[test] + fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> { + let codec = DistributedCodec; + let registry = MemoryFunctionRegistry::new(); + + let schema = schema_i32("f"); + let flight = Arc::new(new_network_coalesce_tasks_exec( + Partitioning::UnknownPartitioning(1), + schema, + dummy_stage(), + )); + + let plan: Arc = + Arc::new(PartitionIsolatorExec::new_ready(flight.clone(), 1)?); + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let decoded = codec.try_decode(&buf, &[flight], ®istry)?; + assert_eq!(repr(&plan), repr(&decoded)); + + Ok(()) + } + + #[test] + fn test_roundtrip_isolator_union_coalesce() -> datafusion::common::Result<()> { + let codec = DistributedCodec; + let registry = MemoryFunctionRegistry::new(); + + let schema = schema_i32("g"); + let left = Arc::new(new_network_coalesce_tasks_exec( + Partitioning::RoundRobinBatch(2), + schema.clone(), + dummy_stage(), + )); + let right = Arc::new(new_network_coalesce_tasks_exec( + Partitioning::RoundRobinBatch(2), + schema.clone(), + dummy_stage(), + )); + + let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()])); + let plan: Arc = + Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 3)?); + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let decoded = codec.try_decode(&buf, &[union], ®istry)?; + assert_eq!(repr(&plan), repr(&decoded)); + + Ok(()) + } }