Skip to content

Commit e5a284e

Browse files
committed
Add distributed plan and stages to EXPLAIN output
1 parent c59bde2 commit e5a284e

File tree

8 files changed

+845
-110
lines changed

8 files changed

+845
-110
lines changed

scripts/launch_python_arrowflightsql_client.sh

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,6 @@ def explain_sql(sql_query):
247247
print("No explain plan returned")
248248
return
249249
250-
# Debug: show raw results
251-
print("DEBUG: Raw results from EXPLAIN:")
252-
print(f"DEBUG: Total rows returned: {len(results)}")
253-
for i, row in enumerate(results):
254-
row_content = str(row[1])[:200] + "..." if len(str(row[1])) > 200 else str(row[1])
255-
print(f"DEBUG: Row {i}: [{row[0]}] = {row_content}")
256-
print("DEBUG: " + "="*50)
257-
258250
logical_plan = None
259251
physical_plan = None
260252
distributed_plan = None
@@ -322,14 +314,6 @@ def explain_analyze_sql(sql_query):
322314
print("No explain analyze plan returned")
323315
return
324316
325-
# Debug: show raw results
326-
print("DEBUG: Raw results from EXPLAIN ANALYZE:")
327-
print(f"DEBUG: Total rows returned: {len(results)}")
328-
for i, row in enumerate(results):
329-
row_content = str(row[1])[:200] + "..." if len(str(row[1])) > 200 else str(row[1])
330-
print(f"DEBUG: Row {i}: [{row[0]}] = {row_content}")
331-
print("DEBUG: " + "="*50)
332-
333317
plan_with_metrics = None
334318
335319
for row in results:

src/explain.rs

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::{
19+
any::Any,
20+
fmt::Formatter,
21+
sync::Arc,
22+
};
23+
24+
use arrow::{
25+
array::StringArray,
26+
datatypes::SchemaRef,
27+
record_batch::RecordBatch,
28+
};
29+
use datafusion::{
30+
execution::TaskContext,
31+
physical_plan::{
32+
execution_plan::{Boundedness, EmissionType},
33+
memory::MemoryStream,
34+
ExecutionPlan, Partitioning,
35+
PlanProperties, DisplayAs, DisplayFormatType,
36+
SendableRecordBatchStream, displayable,
37+
},
38+
physical_expr::EquivalenceProperties,
39+
};
40+
41+
/// Custom distributed EXPLAIN execution plan that also returns distributed plan and stages
42+
#[derive(Debug)]
43+
pub struct DistributedExplainExec {
44+
schema: SchemaRef,
45+
logical_plan: String,
46+
physical_plan: String,
47+
distributed_plan: String,
48+
distributed_stages: String,
49+
properties: PlanProperties,
50+
}
51+
52+
impl DistributedExplainExec {
53+
pub fn new(
54+
schema: SchemaRef,
55+
logical_plan: String,
56+
physical_plan: String,
57+
distributed_plan: String,
58+
distributed_stages: String,
59+
) -> Self {
60+
// properties required by the ExecutionPlan trait
61+
let properties = PlanProperties::new(
62+
EquivalenceProperties::new(schema.clone()),
63+
Partitioning::UnknownPartitioning(1),
64+
EmissionType::Incremental,
65+
Boundedness::Bounded,
66+
);
67+
68+
Self {
69+
schema,
70+
logical_plan,
71+
physical_plan,
72+
distributed_plan,
73+
distributed_stages,
74+
properties,
75+
}
76+
}
77+
78+
pub fn logical_plan(&self) -> &str {
79+
&self.logical_plan
80+
}
81+
82+
pub fn physical_plan(&self) -> &str {
83+
&self.physical_plan
84+
}
85+
86+
pub fn distributed_plan(&self) -> &str {
87+
&self.distributed_plan
88+
}
89+
90+
pub fn distributed_stages(&self) -> &str {
91+
&self.distributed_stages
92+
}
93+
94+
/// Format distributed stages for display
95+
pub fn format_distributed_stages(stages: &[crate::planning::DFRayStage]) -> String {
96+
let mut result = String::new();
97+
for (i, stage) in stages.iter().enumerate() {
98+
result.push_str(&format!("Stage {}:\n", stage.stage_id));
99+
result.push_str(&format!(" Partition Groups: {:?}\n", stage.partition_groups));
100+
result.push_str(&format!(" Full Partitions: {}\n", stage.full_partitions));
101+
result.push_str(" Plan:\n");
102+
let plan_display = format!("{}", displayable(stage.plan.as_ref()).indent(true));
103+
for line in plan_display.lines() {
104+
result.push_str(&format!(" {}\n", line));
105+
}
106+
if i < stages.len() - 1 {
107+
result.push('\n');
108+
}
109+
}
110+
if result.is_empty() {
111+
result.push_str("No distributed stages generated");
112+
}
113+
result
114+
}
115+
}
116+
117+
impl DisplayAs for DistributedExplainExec {
118+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
119+
write!(f, "DistributedExplainExec")
120+
}
121+
}
122+
123+
impl ExecutionPlan for DistributedExplainExec {
124+
fn name(&self) -> &str {
125+
"DistributedExplainExec"
126+
}
127+
128+
fn as_any(&self) -> &dyn Any {
129+
self
130+
}
131+
132+
fn properties(&self) -> &PlanProperties {
133+
&self.properties
134+
}
135+
136+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
137+
vec![]
138+
}
139+
140+
fn with_new_children(
141+
self: Arc<Self>,
142+
_children: Vec<Arc<dyn ExecutionPlan>>,
143+
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
144+
Ok(self)
145+
}
146+
147+
fn execute(
148+
&self,
149+
_partition: usize,
150+
_context: Arc<TaskContext>,
151+
) -> datafusion::error::Result<SendableRecordBatchStream> {
152+
let schema = self.schema.clone();
153+
154+
// Create the result data with our 4 plan types
155+
let plan_types = StringArray::from(vec![
156+
"logical_plan",
157+
"physical_plan",
158+
"distributed_plan",
159+
"distributed_stages"
160+
]);
161+
let plans = StringArray::from(vec![
162+
self.logical_plan.as_str(),
163+
self.physical_plan.as_str(),
164+
self.distributed_plan.as_str(),
165+
self.distributed_stages.as_str(),
166+
]);
167+
168+
let batch = RecordBatch::try_new(
169+
schema.clone(),
170+
vec![Arc::new(plan_types), Arc::new(plans)],
171+
).map_err(|e| datafusion::error::DataFusionError::ArrowError(e, None))?;
172+
173+
// Use MemoryStream which is designed for DataFusion execution plans
174+
let stream = MemoryStream::try_new(vec![batch], schema, None)?;
175+
176+
Ok(Box::pin(stream))
177+
}
178+
179+
fn schema(&self) -> SchemaRef {
180+
self.schema.clone()
181+
}
182+
}

src/k8s.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,18 @@ static WORKER_DISCOVERY: OnceLock<Result<WorkerDiscovery>> = OnceLock::new();
2121

2222
pub fn get_worker_addresses() -> Result<Vec<(String, String)>> {
2323
match WORKER_DISCOVERY.get_or_init(WorkerDiscovery::new) {
24-
Ok(wd) => Ok(wd.get_addresses()),
24+
Ok(wd) => {
25+
let worker_addrs = wd.get_addresses();
26+
debug!(
27+
"Worker addresses found:\n{}",
28+
worker_addrs
29+
.iter()
30+
.map(|(name, addr)| format!("{}: {}", name, addr))
31+
.collect::<Vec<_>>()
32+
.join("\n")
33+
);
34+
Ok(worker_addrs)
35+
},
2536
Err(e) => Err(anyhow!("Failed to initialize WorkerDiscovery: {}", e).into()),
2637
}
2738
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ mod proto;
2323
pub use proto::generated::protobuf;
2424

2525
pub mod codec;
26+
pub mod explain;
2627
pub mod flight;
2728
pub mod friendly;
2829
pub mod isolator;

src/planning.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::{
4545
vocab::{Addrs, CtxName, CtxPartitionGroup, CtxStageAddrs},
4646
};
4747

48+
#[derive(Debug)]
4849
pub struct DFRayStage {
4950
/// our stage id
5051
pub stage_id: u64,
@@ -239,11 +240,12 @@ pub async fn physical_planning(
239240
Ok(physical_plan)
240241
}
241242

243+
/// Returns distributed plan and execution stages for both query execution and EXPLAIN display
242244
pub async fn execution_planning(
243245
physical_plan: Arc<dyn ExecutionPlan>,
244246
batch_size: usize,
245247
partitions_per_worker: Option<usize>,
246-
) -> Result<Vec<DFRayStage>> {
248+
) -> Result<(Arc<dyn ExecutionPlan>, Vec<DFRayStage>)> {
247249
let mut stages = vec![];
248250

249251
let mut partition_groups = vec![];
@@ -336,9 +338,11 @@ pub async fn execution_planning(
336338
// a carry over from when it was done in a physical optimizer seperate
337339
// step
338340
let optimizer = DFRayStageOptimizerRule::new();
339-
let physical_plan = optimizer.optimize(physical_plan, &ConfigOptions::default())?;
341+
let distributed_plan = optimizer.optimize(physical_plan, &ConfigOptions::default())?;
340342

341-
physical_plan.transform_up(up)?;
343+
// Clone the distributed plan before transformation since we need to return it
344+
let distributed_plan_clone = Arc::clone(&distributed_plan);
345+
distributed_plan.transform_up(up)?;
342346

343347
// add coalesce and max rows to last stage
344348
let mut last_stage = stages.pop().ok_or(anyhow!("No stages found"))?;
@@ -363,7 +367,7 @@ pub async fn execution_planning(
363367
.join(",\n");
364368
trace!("stages:{}", txt);
365369

366-
Ok(stages)
370+
Ok((distributed_plan_clone, stages))
367371
}
368372

369373
/// Distribute the stages to the workers, assigning each stage to a worker

src/proto/datafusion_ray.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ message ContextScanExecNode {
4343
datafusion_common.Schema schema = 3;
4444
}
4545

46+
message DistributedExplainExecNode {
47+
datafusion_common.Schema schema = 1;
48+
string logical_plan = 2;
49+
string physical_plan = 3;
50+
string distributed_plan = 4;
51+
string distributed_stages = 5;
52+
}
53+
4654
message DFRayExecNode {
4755
oneof payload {
4856
DFRayStageReaderExecNode stage_reader_exec = 1;
@@ -70,6 +78,8 @@ message TicketStatementData {
7078
StageAddrs stage_addrs = 3;
7179
// the schema of the final stage
7280
datafusion_common.Schema schema = 4;
81+
// For EXPLAIN queries, store the explain plan data directly
82+
optional DistributedExplainExecNode explain_data = 5;
7383
}
7484

7585
message Host {

src/proto/generated/protobuf.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,19 @@ pub struct ContextScanExecNode {
4242
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
4343
}
4444
#[derive(Clone, PartialEq, ::prost::Message)]
45+
pub struct DistributedExplainExecNode {
46+
#[prost(message, optional, tag = "1")]
47+
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
48+
#[prost(string, tag = "2")]
49+
pub logical_plan: ::prost::alloc::string::String,
50+
#[prost(string, tag = "3")]
51+
pub physical_plan: ::prost::alloc::string::String,
52+
#[prost(string, tag = "4")]
53+
pub distributed_plan: ::prost::alloc::string::String,
54+
#[prost(string, tag = "5")]
55+
pub distributed_stages: ::prost::alloc::string::String,
56+
}
57+
#[derive(Clone, PartialEq, ::prost::Message)]
4558
pub struct DfRayExecNode {
4659
#[prost(oneof = "df_ray_exec_node::Payload", tags = "1, 2, 3, 4, 5")]
4760
pub payload: ::core::option::Option<df_ray_exec_node::Payload>,
@@ -88,6 +101,9 @@ pub struct TicketStatementData {
88101
/// the schema of the final stage
89102
#[prost(message, optional, tag = "4")]
90103
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
104+
/// For EXPLAIN queries, store the explain plan data directly
105+
#[prost(message, optional, tag = "5")]
106+
pub explain_data: ::core::option::Option<DistributedExplainExecNode>,
91107
}
92108
#[derive(Clone, PartialEq, ::prost::Message)]
93109
pub struct Host {

0 commit comments

Comments
 (0)