Skip to content

Commit 0c1ab37

Browse files
committed
execution debugging
Signed-off-by: Alex Qyoun-ae <[email protected]>
1 parent e3eb4cf commit 0c1ab37

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

datafusion/core/src/physical_plan/hash_join.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ impl ExecutionPlan for HashJoinExec {
298298
partition: usize,
299299
context: Arc<TaskContext>,
300300
) -> Result<SendableRecordBatchStream> {
301+
println!("HASH JOIN EXEC ENGAGED\n");
301302
// This is a hacky way to support type coercion for join expressions
302303
// Without this it would panic later, in build_join_indexes => equal_rows, when it would try to downcast both sides to same primitive type
303304
// TODO Remove this after rebasing on top of commit ac2e5d15 "Support type coercion for equijoin (#4666)". It was first released at DF 16.0
@@ -326,6 +327,7 @@ impl ExecutionPlan for HashJoinExec {
326327

327328
// we only want to compute the build side once for PartitionMode::CollectLeft
328329
let left_data = {
330+
println!("PARTITION MODE: {:#?}", self.mode);
329331
match self.mode {
330332
PartitionMode::CollectLeft => {
331333
let mut build_side = self.build_side.lock().await;
@@ -425,6 +427,7 @@ impl ExecutionPlan for HashJoinExec {
425427
concat_batches(&self.left.schema(), &batches, num_rows)?;
426428

427429
let left_side = Arc::new((hashmap, single_batch));
430+
println!("LEFT SIDE HASH JOIN:\n{:#?}\n", left_side);
428431

429432
debug!(
430433
"Built build-side {} of hash join containing {} rows in {} ms",
@@ -1086,7 +1089,12 @@ impl Stream for HashJoinStream {
10861089
JoinType::Inner | JoinType::Right => {}
10871090
}
10881091
}
1089-
Some(result.map(|x| x.0))
1092+
let result = Some(result.map(|x| x.0));
1093+
println!(
1094+
"HASH JOIN EXEC ISSUING BATCH 1:\n{:#?}\n",
1095+
result.as_ref().unwrap()
1096+
);
1097+
result
10901098
}
10911099
other => {
10921100
let timer = self.join_metrics.join_time.timer();
@@ -1115,6 +1123,7 @@ impl Stream for HashJoinStream {
11151123
}
11161124
timer.done();
11171125
self.is_exhausted = true;
1126+
println!("HASH JOIN EXEC ISSUING BATCH 2:\n{:#?}\n", result);
11181127
return Some(result);
11191128
}
11201129
JoinType::Left

datafusion/core/src/physical_plan/planner.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
3939
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
4040
use crate::physical_plan::cross_join::CrossJoinExec;
4141
use crate::physical_plan::explain::ExplainExec;
42-
use crate::physical_plan::expressions;
4342
use crate::physical_plan::expressions::{
4443
CaseExpr, Column, GetIndexedFieldExpr, Literal, PhysicalSortExpr,
4544
};
@@ -54,6 +53,7 @@ use crate::physical_plan::subquery::SubqueryExec;
5453
use crate::physical_plan::udf;
5554
use crate::physical_plan::udtf;
5655
use crate::physical_plan::windows::WindowAggExec;
56+
use crate::physical_plan::{expressions, DisplayFormatType};
5757
use crate::physical_plan::{join_utils, Partitioning};
5858
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
5959
use crate::scalar::ScalarValue;
@@ -74,6 +74,7 @@ use datafusion_physical_expr::expressions::{any, OuterColumn};
7474
use futures::future::BoxFuture;
7575
use futures::{FutureExt, StreamExt, TryStreamExt};
7676
use log::{debug, trace};
77+
use std::fmt::Debug;
7778
use std::sync::Arc;
7879

7980
fn create_function_physical_name(
@@ -434,15 +435,45 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
434435
logical_plan: &LogicalPlan,
435436
session_state: &SessionState,
436437
) -> Result<Arc<dyn ExecutionPlan>> {
437-
match self.handle_explain(logical_plan, session_state).await? {
438+
let x = match self.handle_explain(logical_plan, session_state).await? {
438439
Some(plan) => Ok(plan),
439440
None => {
440441
let plan = self
441442
.create_initial_plan(logical_plan, session_state)
442443
.await?;
443444
self.optimize_internal(plan, session_state, |_, _| {})
444445
}
446+
}?;
447+
448+
struct X {
449+
plan: Arc<dyn ExecutionPlan>,
450+
level: usize,
451+
}
452+
453+
impl Debug for X {
454+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455+
for _ in 0..self.level {
456+
write!(f, " ")?;
457+
}
458+
self.plan.fmt_as(DisplayFormatType::Default, f)?;
459+
writeln!(f)?;
460+
for child in self.plan.children() {
461+
let a = X {
462+
plan: Arc::clone(&child),
463+
level: self.level + 1,
464+
};
465+
write!(f, "{:?}", a)?;
466+
}
467+
Ok(())
468+
}
445469
}
470+
471+
let a = X {
472+
plan: Arc::clone(&x),
473+
level: 0,
474+
};
475+
println!("PHYSICAL PLAN:\n{:?}\n", a);
476+
Ok(x)
446477
}
447478

448479
/// Create a physical expression from a logical expression
@@ -645,8 +676,10 @@ impl DefaultPhysicalPlanner {
645676
aggr_expr,
646677
..
647678
}) => {
679+
println!("\nAGGREGATE INITIAL PLAN");
648680
// Initially need to perform the aggregate and then merge the partitions
649681
let input_exec = self.create_initial_plan(input, session_state).await?;
682+
println!("INPUT EXEC: {:?}", input_exec);
650683
let physical_input_schema = input_exec.schema();
651684
let logical_input_schema = input.as_ref().schema();
652685

@@ -664,6 +697,7 @@ impl DefaultPhysicalPlanner {
664697
))
665698
})
666699
.collect::<Result<Vec<_>>>()?;
700+
println!("GROUPS: {:?}", groups);
667701
let aggregates = aggr_expr
668702
.iter()
669703
.map(|e| {
@@ -675,6 +709,7 @@ impl DefaultPhysicalPlanner {
675709
)
676710
})
677711
.collect::<Result<Vec<_>>>()?;
712+
println!("AGGREGATES: {:?}", aggregates);
678713

679714
let initial_aggr = Arc::new(HashAggregateExec::try_new(
680715
AggregateMode::Partial,
@@ -683,20 +718,27 @@ impl DefaultPhysicalPlanner {
683718
input_exec,
684719
physical_input_schema.clone(),
685720
)?);
721+
println!("INITIAL AGGR: {:?}", initial_aggr);
686722

687723
// update group column indices based on partial aggregate plan evaluation
688724
let final_group: Vec<Arc<dyn PhysicalExpr>> = initial_aggr.output_group_expr();
725+
println!("FINAL GROUP: {:?}", final_group);
689726

690727
// TODO: dictionary type not yet supported in Hash Repartition
691728
let contains_dict = groups
692729
.iter()
693730
.flat_map(|x| x.0.data_type(physical_input_schema.as_ref()))
694731
.any(|x| matches!(x, DataType::Dictionary(_, _)));
732+
println!("CONTAINS DICT: {:?}", contains_dict);
695733

696734
let can_repartition = !groups.is_empty()
697735
&& session_state.config.target_partitions > 1
698736
&& session_state.config.repartition_aggregations
699737
&& !contains_dict;
738+
println!("GROUPS IS EMPTY: {:?}", groups.is_empty());
739+
println!("TARGET PARTITIONS: {:?}", session_state.config.target_partitions);
740+
println!("REPARTITION AGGREGATIONS: {:?}", session_state.config.repartition_aggregations);
741+
println!("CAN REPARTITION: {:?}", can_repartition);
700742

701743
let (initial_aggr, next_partition_mode): (
702744
Arc<dyn ExecutionPlan>,
@@ -717,6 +759,9 @@ impl DefaultPhysicalPlanner {
717759
// first aggregation and the expressions corresponding to the respective aggregate
718760
(initial_aggr, AggregateMode::Final)
719761
};
762+
println!("INITIAL AGGR AFTER REPARTITION: {:?}", initial_aggr);
763+
println!("NEXT PARTITION MODE: {:?}", next_partition_mode);
764+
println!();
720765

721766
Ok(Arc::new(HashAggregateExec::try_new(
722767
next_partition_mode,

0 commit comments

Comments
 (0)