Skip to content

Commit 1f654bb

Browse files
feat: implement metrics for AsyncFuncExec (#19626)
## Which issue does this PR close? - Closes #19623 ## Rationale for this change Explained in issue ## What changes are included in this PR? Implement baseline metrics for `AsyncFuncExec` operator ## Are these changes tested? Have added a test with PR ## Are there any user-facing changes? There are user facing changes, now `explain analyze` will show metrics of `AsyncFuncExec` too, do we need to update any documentation regarding this? Asking cause I couldn't find anything on a quick search. --------- Co-authored-by: Yongting You <[email protected]>
1 parent 166ef81 commit 1f654bb

File tree

2 files changed

+44
-6
lines changed

2 files changed

+44
-6
lines changed

datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,14 @@ use arrow::array::{Int32Array, RecordBatch, StringArray};
2121
use arrow::datatypes::{DataType, Field, Schema};
2222
use async_trait::async_trait;
2323
use datafusion::prelude::*;
24+
use datafusion_common::test_util::format_batches;
2425
use datafusion_common::{Result, assert_batches_eq};
2526
use datafusion_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl};
2627
use datafusion_expr::{
2728
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
2829
};
2930

30-
// This test checks the case where batch_size doesn't evenly divide
31-
// the number of rows.
32-
#[tokio::test]
33-
async fn test_async_udf_with_non_modular_batch_size() -> Result<()> {
31+
fn register_table_and_udf() -> Result<SessionContext> {
3432
let num_rows = 3;
3533
let batch_size = 2;
3634

@@ -59,6 +57,15 @@ async fn test_async_udf_with_non_modular_batch_size() -> Result<()> {
5957
.into_scalar_udf(),
6058
);
6159

60+
Ok(ctx)
61+
}
62+
63+
// This test checks the case where batch_size doesn't evenly divide
64+
// the number of rows.
65+
#[tokio::test]
66+
async fn test_async_udf_with_non_modular_batch_size() -> Result<()> {
67+
let ctx = register_table_and_udf()?;
68+
6269
let df = ctx
6370
.sql("SELECT id, test_async_udf(prompt) as result FROM test_table")
6471
.await?;
@@ -81,6 +88,31 @@ async fn test_async_udf_with_non_modular_batch_size() -> Result<()> {
8188
Ok(())
8289
}
8390

91+
// This test checks if metrics are printed for `AsyncFuncExec`
92+
#[tokio::test]
93+
async fn test_async_udf_metrics() -> Result<()> {
94+
let ctx = register_table_and_udf()?;
95+
96+
let df = ctx
97+
.sql(
98+
"EXPLAIN ANALYZE SELECT id, test_async_udf(prompt) as result FROM test_table",
99+
)
100+
.await?;
101+
102+
let result = df.collect().await?;
103+
104+
let explain_analyze_str = format_batches(&result)?.to_string();
105+
let async_func_exec_without_metrics =
106+
explain_analyze_str.split("\n").any(|metric_line| {
107+
metric_line.contains("AsyncFuncExec")
108+
&& !metric_line.contains("output_rows=3")
109+
});
110+
111+
assert!(!async_func_exec_without_metrics);
112+
113+
Ok(())
114+
}
115+
84116
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
85117
struct TestAsyncUDFImpl {
86118
batch_size: usize,

datafusion/physical-plan/src/async_func.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use datafusion_physical_expr::ScalarFunctionExpr;
3030
use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr;
3131
use datafusion_physical_expr::equivalence::ProjectionMapping;
3232
use datafusion_physical_expr::expressions::Column;
33+
use datafusion_physical_expr_common::metrics::{BaselineMetrics, RecordOutput};
3334
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3435
use futures::Stream;
3536
use futures::stream::StreamExt;
@@ -182,11 +183,14 @@ impl ExecutionPlan for AsyncFuncExec {
182183
context.session_id(),
183184
context.task_id()
184185
);
185-
// TODO figure out how to record metrics
186186

187187
// first execute the input stream
188188
let input_stream = self.input.execute(partition, Arc::clone(&context))?;
189189

190+
// TODO: Track `elapsed_compute` in `BaselineMetrics`
191+
// Issue: <https://github.com/apache/datafusion/issues/19658>
192+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
193+
190194
// now, for each record batch, evaluate the async expressions and add the columns to the result
191195
let async_exprs_captured = Arc::new(self.async_exprs.clone());
192196
let schema_captured = self.schema();
@@ -207,6 +211,7 @@ impl ExecutionPlan for AsyncFuncExec {
207211
let async_exprs_captured = Arc::clone(&async_exprs_captured);
208212
let schema_captured = Arc::clone(&schema_captured);
209213
let config_options = Arc::clone(&config_options_ref);
214+
let baseline_metrics_captured = baseline_metrics.clone();
210215

211216
async move {
212217
let batch = batch?;
@@ -219,7 +224,8 @@ impl ExecutionPlan for AsyncFuncExec {
219224
output_arrays.push(output.to_array(batch.num_rows())?);
220225
}
221226
let batch = RecordBatch::try_new(schema_captured, output_arrays)?;
222-
Ok(batch)
227+
228+
Ok(batch.record_output(&baseline_metrics_captured))
223229
}
224230
});
225231

0 commit comments

Comments
 (0)