Skip to content

Commit 4ea0b3e

Browse files
committed
execution debugging
Signed-off-by: Alex Qyoun-ae <[email protected]>
1 parent 19a0ada commit 4ea0b3e

File tree

2 files changed

+43
-3
lines changed

2 files changed

+43
-3
lines changed

datafusion/core/src/physical_plan/hash_join.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ impl ExecutionPlan for HashJoinExec {
298298
partition: usize,
299299
context: Arc<TaskContext>,
300300
) -> Result<SendableRecordBatchStream> {
301+
println!("HASH JOIN EXEC ENGAGED\n");
301302
// This is a hacky way to support type coercion for join expressions
302303
// Without this it would panic later, in build_join_indexes => equal_rows, when it would try to downcast both sides to same primitive type
303304
// TODO Remove this after rebasing on top of commit ac2e5d15 "Support type coercion for equijoin (#4666)". It was first released at DF 16.0
@@ -326,6 +327,7 @@ impl ExecutionPlan for HashJoinExec {
326327

327328
// we only want to compute the build side once for PartitionMode::CollectLeft
328329
let left_data = {
330+
println!("PARTITION MODE: {:#?}", self.mode);
329331
match self.mode {
330332
PartitionMode::CollectLeft => {
331333
let mut build_side = self.build_side.lock().await;
@@ -425,6 +427,7 @@ impl ExecutionPlan for HashJoinExec {
425427
concat_batches(&self.left.schema(), &batches, num_rows)?;
426428

427429
let left_side = Arc::new((hashmap, single_batch));
430+
println!("LEFT SIDE HASH JOIN:\n{:#?}\n", left_side);
428431

429432
debug!(
430433
"Built build-side {} of hash join containing {} rows in {} ms",
@@ -1086,7 +1089,12 @@ impl Stream for HashJoinStream {
10861089
JoinType::Inner | JoinType::Right => {}
10871090
}
10881091
}
1089-
Some(result.map(|x| x.0))
1092+
let result = Some(result.map(|x| x.0));
1093+
println!(
1094+
"HASH JOIN EXEC ISSUING BATCH 1:\n{:#?}\n",
1095+
result.as_ref().unwrap()
1096+
);
1097+
result
10901098
}
10911099
other => {
10921100
let timer = self.join_metrics.join_time.timer();
@@ -1115,6 +1123,7 @@ impl Stream for HashJoinStream {
11151123
}
11161124
timer.done();
11171125
self.is_exhausted = true;
1126+
println!("HASH JOIN EXEC ISSUING BATCH 2:\n{:#?}\n", result);
11181127
return Some(result);
11191128
}
11201129
JoinType::Left

datafusion/core/src/physical_plan/planner.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
3939
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
4040
use crate::physical_plan::cross_join::CrossJoinExec;
4141
use crate::physical_plan::explain::ExplainExec;
42-
use crate::physical_plan::expressions;
4342
use crate::physical_plan::expressions::{
4443
CaseExpr, Column, GetIndexedFieldExpr, Literal, PhysicalSortExpr,
4544
};
@@ -54,6 +53,7 @@ use crate::physical_plan::subquery::SubqueryExec;
5453
use crate::physical_plan::udf;
5554
use crate::physical_plan::udtf;
5655
use crate::physical_plan::windows::WindowAggExec;
56+
use crate::physical_plan::{expressions, DisplayFormatType};
5757
use crate::physical_plan::{join_utils, Partitioning};
5858
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
5959
use crate::scalar::ScalarValue;
@@ -74,6 +74,7 @@ use datafusion_physical_expr::expressions::{any, OuterColumn};
7474
use futures::future::BoxFuture;
7575
use futures::{FutureExt, StreamExt, TryStreamExt};
7676
use log::{debug, trace};
77+
use std::fmt::Debug;
7778
use std::sync::Arc;
7879

7980
fn create_function_physical_name(
@@ -434,15 +435,45 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
434435
logical_plan: &LogicalPlan,
435436
session_state: &SessionState,
436437
) -> Result<Arc<dyn ExecutionPlan>> {
437-
match self.handle_explain(logical_plan, session_state).await? {
438+
let x = match self.handle_explain(logical_plan, session_state).await? {
438439
Some(plan) => Ok(plan),
439440
None => {
440441
let plan = self
441442
.create_initial_plan(logical_plan, session_state)
442443
.await?;
443444
self.optimize_internal(plan, session_state, |_, _| {})
444445
}
446+
}?;
447+
448+
struct X {
449+
plan: Arc<dyn ExecutionPlan>,
450+
level: usize,
451+
}
452+
453+
impl Debug for X {
454+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455+
for _ in 0..self.level {
456+
write!(f, " ")?;
457+
}
458+
self.plan.fmt_as(DisplayFormatType::Default, f)?;
459+
writeln!(f)?;
460+
for child in self.plan.children() {
461+
let a = X {
462+
plan: Arc::clone(&child),
463+
level: self.level + 1,
464+
};
465+
write!(f, "{:?}", a)?;
466+
}
467+
Ok(())
468+
}
445469
}
470+
471+
let a = X {
472+
plan: Arc::clone(&x),
473+
level: 0,
474+
};
475+
println!("PHYSICAL PLAN:\n{:?}\n", a);
476+
Ok(x)
446477
}
447478

448479
/// Create a physical expression from a logical expression

0 commit comments

Comments
 (0)