Skip to content

Commit 85a9251

Browse files
authored
Refactor StreamJoinMetrics to reuse BaselineMetrics (apache#16674)
* Refactor StreamJoinMetrics to reuse BaselineMetrics Signed-off-by: Alan Tang <jmtangcs@gmail.com> * use the record_poll method to update output rows Signed-off-by: Alan Tang <jmtangcs@gmail.com> --------- Signed-off-by: Alan Tang <jmtangcs@gmail.com>
1 parent aadb79b commit 85a9251

File tree

2 files changed

+7
-7
lines changed

2 files changed

+7
-7
lines changed

datafusion/physical-plan/src/joins/stream_join_utils.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::mem::size_of;
2323
use std::sync::Arc;
2424

2525
use crate::joins::utils::{JoinFilter, JoinHashMapType};
26-
use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
26+
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder};
2727
use crate::{metrics, ExecutionPlan};
2828

2929
use arrow::array::{
@@ -659,7 +659,7 @@ pub struct StreamJoinMetrics {
659659
/// Number of batches produced by this operator
660660
pub(crate) output_batches: metrics::Count,
661661
/// Number of rows produced by this operator
662-
pub(crate) output_rows: metrics::Count,
662+
pub(crate) baseline_metrics: BaselineMetrics,
663663
}
664664

665665
impl StreamJoinMetrics {
@@ -686,14 +686,12 @@ impl StreamJoinMetrics {
686686
let output_batches =
687687
MetricBuilder::new(metrics).counter("output_batches", partition);
688688

689-
let output_rows = MetricBuilder::new(metrics).output_rows(partition);
690-
691689
Self {
692690
left,
693691
right,
694692
output_batches,
695693
stream_memory_usage,
696-
output_rows,
694+
baseline_metrics: BaselineMetrics::new(metrics, partition),
697695
}
698696
}
699697
}

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,8 +1375,10 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
13751375
}
13761376
Some((batch, _)) => {
13771377
self.metrics.output_batches.add(1);
1378-
self.metrics.output_rows.add(batch.num_rows());
1379-
return Poll::Ready(Some(Ok(batch)));
1378+
return self
1379+
.metrics
1380+
.baseline_metrics
1381+
.record_poll(Poll::Ready(Some(Ok(batch))));
13801382
}
13811383
}
13821384
}

0 commit comments

Comments
 (0)