Skip to content

Commit de012b4

Browse files
committed
Error handling and minor fixes for ShuffleCodec
1 parent 31f8833 commit de012b4

File tree

1 file changed

+10
-13
lines changed

1 file changed

+10
-13
lines changed

src/shuffle/codec.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@ use crate::protobuf::{RaySqlExecNode, ShuffleReaderExecNode, ShuffleWriterExecNo
2020
use crate::shuffle::{ShuffleReaderExec, ShuffleWriterExec};
2121
use datafusion::arrow::datatypes::SchemaRef;
2222
use datafusion::common::{DataFusionError, Result};
23-
use datafusion::execution::runtime_env::RuntimeEnv;
2423
use datafusion::execution::FunctionRegistry;
2524
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
2625
use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
2726
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
27+
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
2828
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
29-
use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec};
30-
use datafusion_proto::protobuf::{self, PhysicalHashRepartition, PhysicalPlanNode};
29+
use datafusion_proto::protobuf::{self, PhysicalHashRepartition};
3130
use prost::Message;
3231
use std::sync::Arc;
3332

@@ -38,7 +37,7 @@ impl PhysicalExtensionCodec for ShuffleCodec {
3837
fn try_decode(
3938
&self,
4039
buf: &[u8],
41-
_inputs: &[Arc<dyn ExecutionPlan>],
40+
inputs: &[Arc<dyn ExecutionPlan>],
4241
registry: &dyn FunctionRegistry,
4342
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
4443
// decode bytes to protobuf struct
@@ -63,11 +62,7 @@ impl PhysicalExtensionCodec for ShuffleCodec {
6362
)))
6463
}
6564
Some(PlanType::ShuffleWriter(writer)) => {
66-
let plan = writer.plan.unwrap().try_into_physical_plan(
67-
registry,
68-
&RuntimeEnv::default(),
69-
self,
70-
)?;
65+
let plan = inputs[0].clone();
7166
let hash_part = parse_protobuf_hash_partitioning(
7267
writer.partitioning.as_ref(),
7368
registry,
@@ -81,7 +76,7 @@ impl PhysicalExtensionCodec for ShuffleCodec {
8176
&writer.shuffle_dir,
8277
)))
8378
}
84-
_ => unreachable!(),
79+
_ => Err(DataFusionError::Execution("Missing or unexpected plan_type".into()))
8580
}
8681
}
8782

@@ -102,18 +97,20 @@ impl PhysicalExtensionCodec for ShuffleCodec {
10297
};
10398
PlanType::ShuffleReader(reader)
10499
} else if let Some(writer) = node.as_any().downcast_ref::<ShuffleWriterExec>() {
105-
let plan = PhysicalPlanNode::try_from_physical_plan(writer.plan.clone(), self)?;
106100
let partitioning =
107101
encode_partitioning_scheme(writer.properties().output_partitioning())?;
108102
let writer = ShuffleWriterExecNode {
109103
stage_id: writer.stage_id as u32,
110-
plan: Some(plan),
104+
// No need to redundantly serialize the child plan, as input plan(s) are recursively
105+
// serialized by PhysicalPlanNode and will be available as `inputs` in `try_decode`.
106+
// TODO: remove this field from the proto definition?
107+
plan: None,
111108
partitioning: Some(partitioning),
112109
shuffle_dir: writer.shuffle_dir.clone(),
113110
};
114111
PlanType::ShuffleWriter(writer)
115112
} else {
116-
unreachable!()
113+
return Err(DataFusionError::Execution(format!("Unsupported plan node: {}", node.name())))
117114
};
118115
plan.encode(buf);
119116
Ok(())

0 commit comments

Comments
 (0)