From 0c4a4f11e8ba4b97b1371759b7cbeb26093acfe2 Mon Sep 17 00:00:00 2001 From: Costi Ciudatu Date: Tue, 12 Nov 2024 11:11:50 +0200 Subject: [PATCH] Error handling and minor fixes for ShuffleCodec --- src/shuffle/codec.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/shuffle/codec.rs b/src/shuffle/codec.rs index 79af0b8..752026f 100644 --- a/src/shuffle/codec.rs +++ b/src/shuffle/codec.rs @@ -20,14 +20,13 @@ use crate::protobuf::{RaySqlExecNode, ShuffleReaderExecNode, ShuffleWriterExecNo use crate::shuffle::{ShuffleReaderExec, ShuffleWriterExec}; use datafusion::arrow::datatypes::SchemaRef; use datafusion::common::{DataFusionError, Result}; -use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; use datafusion::physical_plan::{ExecutionPlan, Partitioning}; use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning; use datafusion_proto::physical_plan::to_proto::serialize_physical_expr; +use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; -use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec}; -use datafusion_proto::protobuf::{self, PhysicalHashRepartition, PhysicalPlanNode}; +use datafusion_proto::protobuf::{self, PhysicalHashRepartition}; use prost::Message; use std::sync::Arc; @@ -38,7 +37,7 @@ impl PhysicalExtensionCodec for ShuffleCodec { fn try_decode( &self, buf: &[u8], - _inputs: &[Arc], + inputs: &[Arc], registry: &dyn FunctionRegistry, ) -> Result, DataFusionError> { // decode bytes to protobuf struct @@ -63,11 +62,7 @@ impl PhysicalExtensionCodec for ShuffleCodec { ))) } Some(PlanType::ShuffleWriter(writer)) => { - let plan = writer.plan.unwrap().try_into_physical_plan( - registry, - &RuntimeEnv::default(), - self, - )?; + let plan = inputs[0].clone(); let hash_part = parse_protobuf_hash_partitioning( writer.partitioning.as_ref(), registry, @@ -81,7 +76,9 @@ impl PhysicalExtensionCodec for ShuffleCodec { &writer.shuffle_dir, ))) } - _ => unreachable!(), + _ => Err(DataFusionError::Execution( + "Missing or unexpected plan_type".into(), + )), } } @@ -102,18 +99,23 @@ impl PhysicalExtensionCodec for ShuffleCodec { }; PlanType::ShuffleReader(reader) } else if let Some(writer) = node.as_any().downcast_ref::() { - let plan = PhysicalPlanNode::try_from_physical_plan(writer.plan.clone(), self)?; let partitioning = encode_partitioning_scheme(writer.properties().output_partitioning())?; let writer = ShuffleWriterExecNode { stage_id: writer.stage_id as u32, - plan: Some(plan), + // No need to redundantly serialize the child plan, as input plan(s) are recursively + // serialized by PhysicalPlanNode and will be available as `inputs` in `try_decode`. + // TODO: remove this field from the proto definition? + plan: None, partitioning: Some(partitioning), shuffle_dir: writer.shuffle_dir.clone(), }; PlanType::ShuffleWriter(writer) } else { - unreachable!() + return Err(DataFusionError::Execution(format!( + "Unsupported plan node: {}", + node.name() + ))); }; plan.encode(buf); Ok(())