Skip to content

Commit bee1a8f

Browse files
authored
Add testing for distributed codec (#217)
* Add tests for NetworkCoalesceExec. * Add standalone NetworkCoalesceExec test and test with n_tasks > 1.
1 parent 137fbd7 commit bee1a8f

File tree

1 file changed

+75
-0
lines changed

1 file changed

+75
-0
lines changed

src/protobuf/distributed_codec.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,4 +534,79 @@ mod tests {
534534

535535
Ok(())
536536
}
537+
538+
#[test]
539+
fn test_roundtrip_single_flight_coalesce() -> datafusion::common::Result<()> {
540+
let codec = DistributedCodec;
541+
let registry = MemoryFunctionRegistry::new();
542+
543+
let schema = schema_i32("e");
544+
let plan: Arc<dyn ExecutionPlan> = Arc::new(new_network_coalesce_tasks_exec(
545+
Partitioning::RoundRobinBatch(3),
546+
schema,
547+
dummy_stage(),
548+
));
549+
550+
let mut buf = Vec::new();
551+
codec.try_encode(plan.clone(), &mut buf)?;
552+
553+
let decoded = codec.try_decode(&buf, &[empty_exec()], &registry)?;
554+
assert_eq!(repr(&plan), repr(&decoded));
555+
556+
Ok(())
557+
}
558+
559+
#[test]
560+
fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> {
561+
let codec = DistributedCodec;
562+
let registry = MemoryFunctionRegistry::new();
563+
564+
let schema = schema_i32("f");
565+
let flight = Arc::new(new_network_coalesce_tasks_exec(
566+
Partitioning::UnknownPartitioning(1),
567+
schema,
568+
dummy_stage(),
569+
));
570+
571+
let plan: Arc<dyn ExecutionPlan> =
572+
Arc::new(PartitionIsolatorExec::new_ready(flight.clone(), 1)?);
573+
574+
let mut buf = Vec::new();
575+
codec.try_encode(plan.clone(), &mut buf)?;
576+
577+
let decoded = codec.try_decode(&buf, &[flight], &registry)?;
578+
assert_eq!(repr(&plan), repr(&decoded));
579+
580+
Ok(())
581+
}
582+
583+
#[test]
584+
fn test_roundtrip_isolator_union_coalesce() -> datafusion::common::Result<()> {
585+
let codec = DistributedCodec;
586+
let registry = MemoryFunctionRegistry::new();
587+
588+
let schema = schema_i32("g");
589+
let left = Arc::new(new_network_coalesce_tasks_exec(
590+
Partitioning::RoundRobinBatch(2),
591+
schema.clone(),
592+
dummy_stage(),
593+
));
594+
let right = Arc::new(new_network_coalesce_tasks_exec(
595+
Partitioning::RoundRobinBatch(2),
596+
schema.clone(),
597+
dummy_stage(),
598+
));
599+
600+
let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()]));
601+
let plan: Arc<dyn ExecutionPlan> =
602+
Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 3)?);
603+
604+
let mut buf = Vec::new();
605+
codec.try_encode(plan.clone(), &mut buf)?;
606+
607+
let decoded = codec.try_decode(&buf, &[union], &registry)?;
608+
assert_eq!(repr(&plan), repr(&decoded));
609+
610+
Ok(())
611+
}
537612
}

0 commit comments

Comments
 (0)