Skip to content

Commit e036a62

Browse files
authored
Implement metrics for HashJoinExec (apache#664)
1 parent a5b3a81 commit e036a62

File tree

1 file changed

+66
-37
lines changed

1 file changed

+66
-37
lines changed

datafusion/src/physical_plan/hash_join.rs

Lines changed: 66 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ use super::{
6464
SendableRecordBatchStream,
6565
};
6666
use crate::physical_plan::coalesce_batches::concat_batches;
67-
use crate::physical_plan::PhysicalExpr;
67+
use crate::physical_plan::{PhysicalExpr, SQLMetric};
6868
use log::debug;
6969

7070
// Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value.
@@ -102,6 +102,35 @@ pub struct HashJoinExec {
102102
random_state: RandomState,
103103
/// Partitioning mode to use
104104
mode: PartitionMode,
105+
/// Metrics
106+
metrics: Arc<HashJoinMetrics>,
107+
}
108+
109+
/// Metrics for HashJoinExec
110+
#[derive(Debug)]
111+
struct HashJoinMetrics {
112+
/// Total time for joining probe-side batches to the build-side batches
113+
join_time: Arc<SQLMetric>,
114+
/// Number of batches consumed by this operator
115+
input_batches: Arc<SQLMetric>,
116+
/// Number of rows consumed by this operator
117+
input_rows: Arc<SQLMetric>,
118+
/// Number of batches produced by this operator
119+
output_batches: Arc<SQLMetric>,
120+
/// Number of rows produced by this operator
121+
output_rows: Arc<SQLMetric>,
122+
}
123+
124+
impl HashJoinMetrics {
125+
fn new() -> Self {
126+
Self {
127+
join_time: SQLMetric::time_nanos(),
128+
input_batches: SQLMetric::counter(),
129+
input_rows: SQLMetric::counter(),
130+
output_batches: SQLMetric::counter(),
131+
output_rows: SQLMetric::counter(),
132+
}
133+
}
105134
}
106135

107136
#[derive(Clone, Copy, Debug, PartialEq)]
@@ -154,6 +183,7 @@ impl HashJoinExec {
154183
build_side: Arc::new(Mutex::new(None)),
155184
random_state,
156185
mode: partition_mode,
186+
metrics: Arc::new(HashJoinMetrics::new()),
157187
})
158188
}
159189

@@ -394,6 +424,7 @@ impl ExecutionPlan for HashJoinExec {
394424
column_indices,
395425
self.random_state.clone(),
396426
visited_left_side,
427+
self.metrics.clone(),
397428
)))
398429
}
399430

@@ -412,6 +443,22 @@ impl ExecutionPlan for HashJoinExec {
412443
}
413444
}
414445
}
446+
447+
fn metrics(&self) -> HashMap<String, SQLMetric> {
448+
let mut metrics = HashMap::new();
449+
metrics.insert("joinTime".to_owned(), (*self.metrics.join_time).clone());
450+
metrics.insert(
451+
"inputBatches".to_owned(),
452+
(*self.metrics.input_batches).clone(),
453+
);
454+
metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone());
455+
metrics.insert(
456+
"outputBatches".to_owned(),
457+
(*self.metrics.output_batches).clone(),
458+
);
459+
metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone());
460+
metrics
461+
}
415462
}
416463

417464
/// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`,
@@ -467,22 +514,14 @@ struct HashJoinStream {
467514
right: SendableRecordBatchStream,
468515
/// Information of index and left / right placement of columns
469516
column_indices: Vec<ColumnIndex>,
470-
/// number of input batches
471-
num_input_batches: usize,
472-
/// number of input rows
473-
num_input_rows: usize,
474-
/// number of batches produced
475-
num_output_batches: usize,
476-
/// number of rows produced
477-
num_output_rows: usize,
478-
/// total time for joining probe-side batches to the build-side batches
479-
join_time: usize,
480517
/// Random state used for hashing initialization
481518
random_state: RandomState,
482519
/// Keeps track of the left side rows whether they are visited
483520
visited_left_side: Vec<bool>, // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240
484521
/// There is nothing to process anymore and left side is processed in case of left join
485522
is_exhausted: bool,
523+
/// Metrics
524+
metrics: Arc<HashJoinMetrics>,
486525
}
487526

488527
#[allow(clippy::too_many_arguments)]
@@ -497,6 +536,7 @@ impl HashJoinStream {
497536
column_indices: Vec<ColumnIndex>,
498537
random_state: RandomState,
499538
visited_left_side: Vec<bool>,
539+
metrics: Arc<HashJoinMetrics>,
500540
) -> Self {
501541
HashJoinStream {
502542
schema,
@@ -506,14 +546,10 @@ impl HashJoinStream {
506546
left_data,
507547
right,
508548
column_indices,
509-
num_input_batches: 0,
510-
num_input_rows: 0,
511-
num_output_batches: 0,
512-
num_output_rows: 0,
513-
join_time: 0,
514549
random_state,
515550
visited_left_side,
516551
is_exhausted: false,
552+
metrics,
517553
}
518554
}
519555
}
@@ -1215,12 +1251,14 @@ impl Stream for HashJoinStream {
12151251
&self.column_indices,
12161252
&self.random_state,
12171253
);
1218-
self.num_input_batches += 1;
1219-
self.num_input_rows += batch.num_rows();
1254+
self.metrics.input_batches.add(1);
1255+
self.metrics.input_rows.add(batch.num_rows());
12201256
if let Ok((ref batch, ref left_side)) = result {
1221-
self.join_time += start.elapsed().as_millis() as usize;
1222-
self.num_output_batches += 1;
1223-
self.num_output_rows += batch.num_rows();
1257+
self.metrics
1258+
.join_time
1259+
.add(start.elapsed().as_millis() as usize);
1260+
self.metrics.output_batches.add(1);
1261+
self.metrics.output_rows.add(batch.num_rows());
12241262

12251263
match self.join_type {
12261264
JoinType::Left
@@ -1254,13 +1292,14 @@ impl Stream for HashJoinStream {
12541292
self.join_type != JoinType::Semi,
12551293
);
12561294
if let Ok(ref batch) = result {
1257-
self.num_input_batches += 1;
1258-
self.num_input_rows += batch.num_rows();
1295+
self.metrics.input_batches.add(1);
1296+
self.metrics.input_rows.add(batch.num_rows());
12591297
if let Ok(ref batch) = result {
1260-
self.join_time +=
1261-
start.elapsed().as_millis() as usize;
1262-
self.num_output_batches += 1;
1263-
self.num_output_rows += batch.num_rows();
1298+
self.metrics
1299+
.join_time
1300+
.add(start.elapsed().as_millis() as usize);
1301+
self.metrics.output_batches.add(1);
1302+
self.metrics.output_rows.add(batch.num_rows());
12641303
}
12651304
}
12661305
self.is_exhausted = true;
@@ -1274,16 +1313,6 @@ impl Stream for HashJoinStream {
12741313
| JoinType::Right => {}
12751314
}
12761315

1277-
// End of right batch, print stats in debug mode
1278-
debug!(
1279-
"Processed {} probe-side input batches containing {} rows and \
1280-
produced {} output batches containing {} rows in {} ms",
1281-
self.num_input_batches,
1282-
self.num_input_rows,
1283-
self.num_output_batches,
1284-
self.num_output_rows,
1285-
self.join_time
1286-
);
12871316
other
12881317
}
12891318
})

0 commit comments

Comments
 (0)