Skip to content

Commit 7f89149

Browse files
committed
Add standalone NetworkCoalesceExec test and test with n_tasks > 1.
1 parent a1bad86 commit 7f89149

File tree

1 file changed

+24
-3
lines changed

1 file changed

+24
-3
lines changed

src/protobuf/distributed_codec.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -536,11 +536,32 @@ mod tests {
536536
}
537537

538538
#[test]
539-
fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> {
539+
fn test_roundtrip_single_flight_coalesce() -> datafusion::common::Result<()> {
540540
let codec = DistributedCodec;
541541
let registry = MemoryFunctionRegistry::new();
542542

543543
let schema = schema_i32("e");
544+
let plan: Arc<dyn ExecutionPlan> = Arc::new(new_network_coalesce_tasks_exec(
545+
Partitioning::RoundRobinBatch(3),
546+
schema,
547+
dummy_stage(),
548+
));
549+
550+
let mut buf = Vec::new();
551+
codec.try_encode(plan.clone(), &mut buf)?;
552+
553+
let decoded = codec.try_decode(&buf, &[empty_exec()], &registry)?;
554+
assert_eq!(repr(&plan), repr(&decoded));
555+
556+
Ok(())
557+
}
558+
559+
#[test]
560+
fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> {
561+
let codec = DistributedCodec;
562+
let registry = MemoryFunctionRegistry::new();
563+
564+
let schema = schema_i32("f");
544565
let flight = Arc::new(new_network_coalesce_tasks_exec(
545566
Partitioning::UnknownPartitioning(1),
546567
schema,
@@ -564,7 +585,7 @@ mod tests {
564585
let codec = DistributedCodec;
565586
let registry = MemoryFunctionRegistry::new();
566587

567-
let schema = schema_i32("c");
588+
let schema = schema_i32("g");
568589
let left = Arc::new(new_network_coalesce_tasks_exec(
569590
Partitioning::RoundRobinBatch(2),
570591
schema.clone(),
@@ -578,7 +599,7 @@ mod tests {
578599

579600
let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()]));
580601
let plan: Arc<dyn ExecutionPlan> =
581-
Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 1)?);
602+
Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 3)?);
582603

583604
let mut buf = Vec::new();
584605
codec.try_encode(plan.clone(), &mut buf)?;

0 commit comments

Comments
 (0)