Skip to content

Commit b451745

Browse files
committed
Add standalone NetworkCoalesceExec test and test with n_tasks > 1.
1 parent a1bad86 commit b451745

File tree

1 file changed

+21
-3
lines changed

1 file changed

+21
-3
lines changed

src/protobuf/distributed_codec.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -536,11 +536,29 @@ mod tests {
536536
}
537537

538538
#[test]
539-
fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> {
539+
fn test_roundtrip_single_flight_coalesce() -> datafusion::common::Result<()> {
540540
let codec = DistributedCodec;
541541
let registry = MemoryFunctionRegistry::new();
542542

543543
let schema = schema_i32("e");
544+
let plan: Arc<dyn ExecutionPlan> =
545+
Arc::new(new_network_coalesce_tasks_exec(Partitioning::RoundRobinBatch(3), schema, dummy_stage()));
546+
547+
let mut buf = Vec::new();
548+
codec.try_encode(plan.clone(), &mut buf)?;
549+
550+
let decoded = codec.try_decode(&buf, &[empty_exec()], &registry)?;
551+
assert_eq!(repr(&plan), repr(&decoded));
552+
553+
Ok(())
554+
}
555+
556+
#[test]
557+
fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> {
558+
let codec = DistributedCodec;
559+
let registry = MemoryFunctionRegistry::new();
560+
561+
let schema = schema_i32("f");
544562
let flight = Arc::new(new_network_coalesce_tasks_exec(
545563
Partitioning::UnknownPartitioning(1),
546564
schema,
@@ -564,7 +582,7 @@ mod tests {
564582
let codec = DistributedCodec;
565583
let registry = MemoryFunctionRegistry::new();
566584

567-
let schema = schema_i32("c");
585+
let schema = schema_i32("g");
568586
let left = Arc::new(new_network_coalesce_tasks_exec(
569587
Partitioning::RoundRobinBatch(2),
570588
schema.clone(),
@@ -578,7 +596,7 @@ mod tests {
578596

579597
let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()]));
580598
let plan: Arc<dyn ExecutionPlan> =
581-
Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 1)?);
599+
Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 3)?);
582600

583601
let mut buf = Vec::new();
584602
codec.try_encode(plan.clone(), &mut buf)?;

0 commit comments

Comments
 (0)