Skip to content

Commit 84ed49c

Browse files
authored
Merge pull request #31 from datafusion-contrib/ntran/explain
Add distributed plan and stages to EXPLAIN output
2 parents c59bde2 + cd57594 commit 84ed49c

File tree

11 files changed

+1327
-198
lines changed

11 files changed

+1327
-198
lines changed

scripts/launch_python_arrowflightsql_client.sh

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,6 @@ def explain_sql(sql_query):
247247
print("No explain plan returned")
248248
return
249249
250-
# Debug: show raw results
251-
print("DEBUG: Raw results from EXPLAIN:")
252-
print(f"DEBUG: Total rows returned: {len(results)}")
253-
for i, row in enumerate(results):
254-
row_content = str(row[1])[:200] + "..." if len(str(row[1])) > 200 else str(row[1])
255-
print(f"DEBUG: Row {i}: [{row[0]}] = {row_content}")
256-
print("DEBUG: " + "="*50)
257-
258250
logical_plan = None
259251
physical_plan = None
260252
distributed_plan = None
@@ -322,14 +314,6 @@ def explain_analyze_sql(sql_query):
322314
print("No explain analyze plan returned")
323315
return
324316
325-
# Debug: show raw results
326-
print("DEBUG: Raw results from EXPLAIN ANALYZE:")
327-
print(f"DEBUG: Total rows returned: {len(results)}")
328-
for i, row in enumerate(results):
329-
row_content = str(row[1])[:200] + "..." if len(str(row[1])) > 200 else str(row[1])
330-
print(f"DEBUG: Row {i}: [{row[0]}] = {row_content}")
331-
print("DEBUG: " + "="*50)
332-
333317
plan_with_metrics = None
334318
335319
for row in results:

src/explain.rs

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::{
19+
any::Any,
20+
fmt::Formatter,
21+
sync::Arc,
22+
};
23+
24+
use arrow::{
25+
array::StringArray,
26+
datatypes::SchemaRef,
27+
record_batch::RecordBatch,
28+
};
29+
use datafusion::{
30+
execution::TaskContext,
31+
physical_plan::{
32+
execution_plan::{Boundedness, EmissionType},
33+
memory::MemoryStream,
34+
ExecutionPlan, Partitioning,
35+
PlanProperties, DisplayAs, DisplayFormatType,
36+
SendableRecordBatchStream, displayable,
37+
},
38+
physical_expr::EquivalenceProperties,
39+
};
40+
41+
/// Custom distributed EXPLAIN execution plan that also returns distributed plan and stages
42+
#[derive(Debug)]
43+
pub struct DistributedExplainExec {
44+
schema: SchemaRef,
45+
logical_plan: String,
46+
physical_plan: String,
47+
distributed_plan: String,
48+
distributed_stages: String,
49+
properties: PlanProperties,
50+
}
51+
52+
impl DistributedExplainExec {
53+
pub fn new(
54+
schema: SchemaRef,
55+
logical_plan: String,
56+
physical_plan: String,
57+
distributed_plan: String,
58+
distributed_stages: String,
59+
) -> Self {
60+
// properties required by the ExecutionPlan trait
61+
let properties = PlanProperties::new(
62+
EquivalenceProperties::new(schema.clone()),
63+
Partitioning::UnknownPartitioning(1),
64+
EmissionType::Incremental,
65+
Boundedness::Bounded,
66+
);
67+
68+
Self {
69+
schema,
70+
logical_plan,
71+
physical_plan,
72+
distributed_plan,
73+
distributed_stages,
74+
properties,
75+
}
76+
}
77+
78+
pub fn logical_plan(&self) -> &str {
79+
&self.logical_plan
80+
}
81+
82+
pub fn physical_plan(&self) -> &str {
83+
&self.physical_plan
84+
}
85+
86+
pub fn distributed_plan(&self) -> &str {
87+
&self.distributed_plan
88+
}
89+
90+
pub fn distributed_stages(&self) -> &str {
91+
&self.distributed_stages
92+
}
93+
94+
/// Format distributed stages for display
95+
pub fn format_distributed_stages(stages: &[crate::planning::DFRayStage]) -> String {
96+
let mut result = String::new();
97+
for (i, stage) in stages.iter().enumerate() {
98+
result.push_str(&format!("Stage {}:\n", stage.stage_id));
99+
result.push_str(&format!(" Partition Groups: {:?}\n", stage.partition_groups));
100+
result.push_str(&format!(" Full Partitions: {}\n", stage.full_partitions));
101+
result.push_str(" Plan:\n");
102+
let plan_display = format!("{}", displayable(stage.plan.as_ref()).indent(true));
103+
for line in plan_display.lines() {
104+
result.push_str(&format!(" {}\n", line));
105+
}
106+
if i < stages.len() - 1 {
107+
result.push('\n');
108+
}
109+
}
110+
if result.is_empty() {
111+
result.push_str("No distributed stages generated");
112+
}
113+
result
114+
}
115+
}
116+
117+
impl DisplayAs for DistributedExplainExec {
118+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
119+
write!(f, "DistributedExplainExec")
120+
}
121+
}
122+
123+
impl ExecutionPlan for DistributedExplainExec {
124+
fn name(&self) -> &str {
125+
"DistributedExplainExec"
126+
}
127+
128+
fn as_any(&self) -> &dyn Any {
129+
self
130+
}
131+
132+
fn properties(&self) -> &PlanProperties {
133+
&self.properties
134+
}
135+
136+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
137+
vec![]
138+
}
139+
140+
fn with_new_children(
141+
self: Arc<Self>,
142+
_children: Vec<Arc<dyn ExecutionPlan>>,
143+
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
144+
Ok(self)
145+
}
146+
147+
fn execute(
148+
&self,
149+
_partition: usize,
150+
_context: Arc<TaskContext>,
151+
) -> datafusion::error::Result<SendableRecordBatchStream> {
152+
let schema = self.schema.clone();
153+
154+
// Create the result data with our 4 plan types
155+
let plan_types = StringArray::from(vec![
156+
"logical_plan",
157+
"physical_plan",
158+
"distributed_plan",
159+
"distributed_stages"
160+
]);
161+
let plans = StringArray::from(vec![
162+
self.logical_plan.as_str(),
163+
self.physical_plan.as_str(),
164+
self.distributed_plan.as_str(),
165+
self.distributed_stages.as_str(),
166+
]);
167+
168+
let batch = RecordBatch::try_new(
169+
schema.clone(),
170+
vec![Arc::new(plan_types), Arc::new(plans)],
171+
).map_err(|e| datafusion::error::DataFusionError::ArrowError(e, None))?;
172+
173+
// Use MemoryStream which is designed for DataFusion execution plans
174+
let stream = MemoryStream::try_new(vec![batch], schema, None)?;
175+
176+
Ok(Box::pin(stream))
177+
}
178+
179+
fn schema(&self) -> SchemaRef {
180+
self.schema.clone()
181+
}
182+
}
183+
184+
/// Check if this is an EXPLAIN query (but not EXPLAIN ANALYZE)
185+
///
186+
/// This function distinguishes between:
187+
/// - EXPLAIN queries (returns true) - show plan information only
188+
/// - EXPLAIN ANALYZE queries (returns false) - execute and show runtime stats
189+
/// - Regular queries (returns false) - normal query execution
190+
pub fn is_explain_query(query: &str) -> bool {
191+
let query_upper = query.trim().to_uppercase();
192+
// Must start with "EXPLAIN" followed by whitespace or end of string
193+
let is_explain = query_upper.starts_with("EXPLAIN") &&
194+
(query_upper.len() == 7 || query_upper.chars().nth(7).is_some_and(|c| c.is_whitespace()));
195+
let is_explain_analyze = query_upper.starts_with("EXPLAIN ANALYZE");
196+
is_explain && !is_explain_analyze
197+
}
198+
199+
#[cfg(test)]
200+
mod tests {
201+
use super::*;
202+
203+
#[test]
204+
fn test_is_explain_query() {
205+
// Test EXPLAIN queries (should return true)
206+
assert!(is_explain_query("EXPLAIN SELECT * FROM table"));
207+
assert!(is_explain_query("explain select * from table"));
208+
assert!(is_explain_query(" EXPLAIN SELECT 1"));
209+
assert!(is_explain_query("EXPLAIN\nSELECT * FROM test"));
210+
211+
// Test EXPLAIN ANALYZE queries (should return false)
212+
assert!(!is_explain_query("EXPLAIN ANALYZE SELECT * FROM table"));
213+
assert!(!is_explain_query("explain analyze SELECT * FROM table"));
214+
assert!(!is_explain_query(" EXPLAIN ANALYZE SELECT 1"));
215+
216+
// Test regular queries (should return false)
217+
assert!(!is_explain_query("SELECT * FROM table"));
218+
assert!(!is_explain_query("INSERT INTO table VALUES (1)"));
219+
assert!(!is_explain_query("UPDATE table SET col = 1"));
220+
assert!(!is_explain_query("DELETE FROM table"));
221+
assert!(!is_explain_query("CREATE TABLE test (id INT)"));
222+
223+
// Test edge cases
224+
assert!(!is_explain_query(""));
225+
assert!(!is_explain_query(" "));
226+
assert!(!is_explain_query("EXPLAINSELECT")); // No space
227+
assert!(is_explain_query("EXPLAIN")); // Just EXPLAIN
228+
}
229+
}

0 commit comments

Comments
 (0)