Skip to content

Commit 875a829

Browse files
committed
Fix serialization bug
1 parent ccf36a1 commit 875a829

File tree

1 file changed

+16
-8
lines changed

1 file changed

+16
-8
lines changed

src/plan/codec.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,22 +84,30 @@ impl PhysicalExtensionCodec for DistributedCodec {
8484
buf: &mut Vec<u8>,
8585
) -> datafusion::common::Result<()> {
8686
if let Some(node) = node.as_any().downcast_ref::<ArrowFlightReadExec>() {
87-
ArrowFlightReadExecProto {
87+
let inner = ArrowFlightReadExecProto {
8888
schema: Some(node.schema().try_into()?),
8989
partitioning: Some(serialize_partitioning(
9090
node.properties().output_partitioning(),
9191
&DistributedCodec {},
9292
)?),
9393
stage_num: node.stage_num as u64,
94-
}
95-
.encode(buf)
96-
.map_err(|err| proto_error(format!("{err}")))
94+
};
95+
96+
let wrapper = DistributedExecProto {
97+
node: Some(DistributedExecNode::ArrowFlightReadExec(inner)),
98+
};
99+
100+
wrapper.encode(buf).map_err(|e| proto_error(format!("{e}")))
97101
} else if let Some(node) = node.as_any().downcast_ref::<PartitionIsolatorExec>() {
98-
PartitionIsolatorExecProto {
102+
let inner = PartitionIsolatorExecProto {
99103
partition_count: node.partition_count as u64,
100-
}
101-
.encode(buf)
102-
.map_err(|err| proto_error(format!("{err}")))
104+
};
105+
106+
let wrapper = DistributedExecProto {
107+
node: Some(DistributedExecNode::PartitionIsolatorExec(inner)),
108+
};
109+
110+
wrapper.encode(buf).map_err(|e| proto_error(format!("{e}")))
103111
} else {
104112
Err(proto_error(format!("Unexpected plan {}", node.name())))
105113
}

0 commit comments

Comments
 (0)