-
Notifications
You must be signed in to change notification settings - Fork 151
Add Python bindings for accessing ExecutionMetrics #1381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,8 @@ | |
| __all__ = [ | ||
| "ExecutionPlan", | ||
| "LogicalPlan", | ||
| "Metric", | ||
| "MetricsSet", | ||
| ] | ||
|
|
||
|
|
||
|
|
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes: | |
| Tables created in memory from record batches are currently not supported. | ||
| """ | ||
| return self._raw_plan.to_proto() | ||
|
|
||
| def metrics(self) -> MetricsSet | None: | ||
| """Return metrics for this plan node after execution, or None if unavailable.""" | ||
| raw = self._raw_plan.metrics() | ||
| if raw is None: | ||
| return None | ||
| return MetricsSet(raw) | ||
|
|
||
| def collect_metrics(self) -> list[tuple[str, MetricsSet]]: | ||
| """Walk the plan tree and collect metrics from all operators. | ||
|
|
||
| Returns a list of (operator_name, MetricsSet) tuples. | ||
|
||
| """ | ||
| result: list[tuple[str, MetricsSet]] = [] | ||
|
|
||
| def _walk(node: ExecutionPlan) -> None: | ||
| ms = node.metrics() | ||
| if ms is not None: | ||
| result.append((node.display(), ms)) | ||
| for child in node.children(): | ||
| _walk(child) | ||
|
|
||
| _walk(self) | ||
| return result | ||
|
|
||
|
|
||
| class MetricsSet: | ||
| """A set of metrics for a single execution plan operator. | ||
|
|
||
| Provides both individual metric access and convenience aggregations | ||
| across partitions. | ||
|
||
| """ | ||
|
|
||
| def __init__(self, raw: df_internal.MetricsSet) -> None: | ||
| """This constructor should not be called by the end user.""" | ||
| self._raw = raw | ||
|
|
||
| def metrics(self) -> list[Metric]: | ||
| """Return all individual metrics in this set.""" | ||
| return [Metric(m) for m in self._raw.metrics()] | ||
|
|
||
| @property | ||
| def output_rows(self) -> int | None: | ||
| """Sum of output_rows across all partitions.""" | ||
| return self._raw.output_rows() | ||
|
|
||
| @property | ||
| def elapsed_compute(self) -> int | None: | ||
| """Sum of elapsed_compute across all partitions, in nanoseconds.""" | ||
|
||
| return self._raw.elapsed_compute() | ||
|
|
||
| @property | ||
| def spill_count(self) -> int | None: | ||
| """Sum of spill_count across all partitions.""" | ||
|
||
| return self._raw.spill_count() | ||
|
|
||
| @property | ||
| def spilled_bytes(self) -> int | None: | ||
| """Sum of spilled_bytes across all partitions.""" | ||
| return self._raw.spilled_bytes() | ||
|
|
||
| @property | ||
| def spilled_rows(self) -> int | None: | ||
| """Sum of spilled_rows across all partitions.""" | ||
| return self._raw.spilled_rows() | ||
|
|
||
| def sum_by_name(self, name: str) -> int | None: | ||
| """Return the sum of metrics matching the given name.""" | ||
|
||
| return self._raw.sum_by_name(name) | ||
|
|
||
| def __repr__(self) -> str: | ||
| """Return a string representation of the metrics set.""" | ||
| return repr(self._raw) | ||
|
|
||
|
|
||
| class Metric: | ||
| """A single execution metric with name, value, partition, and labels.""" | ||
|
|
||
| def __init__(self, raw: df_internal.Metric) -> None: | ||
| """This constructor should not be called by the end user.""" | ||
| self._raw = raw | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| """The name of this metric (e.g. ``output_rows``).""" | ||
| return self._raw.name | ||
|
|
||
| @property | ||
| def value(self) -> int | None: | ||
| """The numeric value of this metric, or None for non-numeric types.""" | ||
| return self._raw.value | ||
|
||
|
|
||
| @property | ||
| def partition(self) -> int | None: | ||
| """The partition this metric applies to, or None if global.""" | ||
|
||
| return self._raw.partition | ||
|
|
||
| def labels(self) -> dict[str, str]: | ||
| """Return the labels associated with this metric.""" | ||
|
||
| return self._raw.labels() | ||
|
|
||
| def __repr__(self) -> str: | ||
| """Return a string representation of the metric.""" | ||
| return repr(self._raw) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,13 @@ | |
| # under the License. | ||
|
|
||
| import pytest | ||
| from datafusion import ExecutionPlan, LogicalPlan, SessionContext | ||
| from datafusion import ( | ||
| ExecutionPlan, | ||
| LogicalPlan, | ||
| Metric, | ||
| MetricsSet, | ||
| SessionContext, | ||
| ) | ||
|
|
||
|
|
||
| # Note: We must use CSV because memory tables are currently not supported for | ||
|
|
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None: | |
| execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes) | ||
|
|
||
| assert str(original_execution_plan) == str(execution_plan) | ||
|
|
||
|
|
||
| def test_metrics_tree_walk() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
| df.collect() | ||
| plan = df.execution_plan() | ||
|
|
||
| results = plan.collect_metrics() | ||
| assert len(results) >= 1 | ||
| found_metrics = False | ||
| for name, ms in results: | ||
| assert isinstance(name, str) | ||
| assert isinstance(ms, MetricsSet) | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
| found_metrics = True | ||
| assert found_metrics | ||
|
|
||
|
|
||
| def test_metric_properties() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
| df.collect() | ||
| plan = df.execution_plan() | ||
|
|
||
| for _, ms in plan.collect_metrics(): | ||
| r = repr(ms) | ||
| assert isinstance(r, str) | ||
| for metric in ms.metrics(): | ||
| assert isinstance(metric, Metric) | ||
| assert isinstance(metric.name, str) | ||
| assert len(metric.name) > 0 | ||
| assert metric.partition is None or isinstance(metric.partition, int) | ||
| assert isinstance(metric.labels(), dict) | ||
| mr = repr(metric) | ||
| assert isinstance(mr, str) | ||
| assert len(mr) > 0 | ||
| return | ||
| pytest.skip("No metrics found") | ||
|
||
|
|
||
|
|
||
| def test_no_metrics_before_execution() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)") | ||
| df = ctx.sql("SELECT * FROM t") | ||
| plan = df.execution_plan() | ||
| ms = plan.metrics() | ||
| assert ms is None or ms.output_rows is None or ms.output_rows == 0 | ||
|
||
|
|
||
|
|
||
| def test_collect_partitioned_metrics() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
|
|
||
| df.collect_partitioned() | ||
| plan = df.execution_plan() | ||
|
|
||
| found_metrics = False | ||
| for _, ms in plan.collect_metrics(): | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
|
||
| found_metrics = True | ||
| assert found_metrics | ||
|
|
||
|
|
||
| def test_execute_stream_metrics() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
|
|
||
| for _ in df.execute_stream(): | ||
| pass | ||
|
|
||
| plan = df.execution_plan() | ||
| found_metrics = False | ||
| for _, ms in plan.collect_metrics(): | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
| found_metrics = True | ||
| assert found_metrics | ||
|
|
||
|
|
||
| def test_execute_stream_partitioned_metrics() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
|
|
||
| for stream in df.execute_stream_partitioned(): | ||
| for _ in stream: | ||
| pass | ||
|
|
||
| plan = df.execution_plan() | ||
| found_metrics = False | ||
| for _, ms in plan.collect_metrics(): | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
| found_metrics = True | ||
| assert found_metrics | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,13 @@ use pyo3::pybacked::PyBackedStr; | |
| use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods}; | ||
|
|
||
| use crate::common::data_type::PyScalarValue; | ||
| use datafusion::physical_plan::{ | ||
| ExecutionPlan as DFExecutionPlan, | ||
| collect as df_collect, | ||
| collect_partitioned as df_collect_partitioned, | ||
| execute_stream as df_execute_stream, | ||
| execute_stream_partitioned as df_execute_stream_partitioned, | ||
| }; | ||
| use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err}; | ||
| use crate::expr::PyExpr; | ||
| use crate::expr::sort_expr::{PySortExpr, to_sort_expressions}; | ||
|
|
@@ -289,6 +296,9 @@ pub struct PyDataFrame { | |
|
|
||
| // In IPython environment cache batches between __repr__ and _repr_html_ calls. | ||
| batches: SharedCachedBatches, | ||
|
|
||
| // Cache the last physical plan so that metrics are available after execution. | ||
| last_plan: Arc<Mutex<Option<Arc<dyn DFExecutionPlan>>>>, | ||
| } | ||
|
|
||
| impl PyDataFrame { | ||
|
|
@@ -297,6 +307,7 @@ impl PyDataFrame { | |
| Self { | ||
| df: Arc::new(df), | ||
| batches: Arc::new(Mutex::new(None)), | ||
| last_plan: Arc::new(Mutex::new(None)), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -626,7 +637,12 @@ impl PyDataFrame { | |
| /// Unless some order is specified in the plan, there is no | ||
| /// guarantee of the order of the result. | ||
| fn collect<'py>(&self, py: Python<'py>) -> PyResult<Vec<Bound<'py, PyAny>>> { | ||
| let batches = wait_for_future(py, self.df.as_ref().clone().collect())? | ||
| let df = self.df.as_ref().clone(); | ||
| let plan = wait_for_future(py, df.create_physical_plan())? | ||
| .map_err(PyDataFusionError::from)?; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
| let batches = wait_for_future(py, df_collect(plan, task_ctx))? | ||
|
||
| .map_err(PyDataFusionError::from)?; | ||
| // cannot use PyResult<Vec<RecordBatch>> return type due to | ||
| // https://github.com/PyO3/pyo3/issues/1813 | ||
|
|
@@ -642,7 +658,12 @@ impl PyDataFrame { | |
| /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch | ||
| /// maintaining the input partitioning. | ||
| fn collect_partitioned<'py>(&self, py: Python<'py>) -> PyResult<Vec<Vec<Bound<'py, PyAny>>>> { | ||
| let batches = wait_for_future(py, self.df.as_ref().clone().collect_partitioned())? | ||
| let df = self.df.as_ref().clone(); | ||
| let plan = wait_for_future(py, df.create_physical_plan())? | ||
| .map_err(PyDataFusionError::from)?; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
| let batches = wait_for_future(py, df_collect_partitioned(plan, task_ctx))? | ||
| .map_err(PyDataFusionError::from)?; | ||
|
|
||
| batches | ||
|
|
@@ -802,7 +823,13 @@ impl PyDataFrame { | |
| } | ||
|
|
||
| /// Get the execution plan for this `DataFrame` | ||
| /// | ||
| /// If the DataFrame has already been executed (e.g. via `collect()`), | ||
| /// returns the cached plan which includes populated metrics. | ||
| fn execution_plan(&self, py: Python) -> PyDataFusionResult<PyExecutionPlan> { | ||
| if let Some(plan) = self.last_plan.lock().as_ref() { | ||
| return Ok(PyExecutionPlan::new(Arc::clone(plan))); | ||
| } | ||
| let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())??; | ||
| Ok(plan.into()) | ||
|
Comment on lines
+847
to
851
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you go the route of using the existing |
||
| } | ||
|
|
@@ -1127,13 +1154,22 @@ impl PyDataFrame { | |
|
|
||
| fn execute_stream(&self, py: Python) -> PyDataFusionResult<PyRecordBatchStream> { | ||
| let df = self.df.as_ref().clone(); | ||
| let stream = spawn_future(py, async move { df.execute_stream().await })?; | ||
| let plan = wait_for_future(py, df.create_physical_plan())??; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
| let stream = spawn_future(py, async move { df_execute_stream(plan, task_ctx) })?; | ||
| Ok(PyRecordBatchStream::new(stream)) | ||
| } | ||
|
|
||
| fn execute_stream_partitioned(&self, py: Python) -> PyResult<Vec<PyRecordBatchStream>> { | ||
| let df = self.df.as_ref().clone(); | ||
| let streams = spawn_future(py, async move { df.execute_stream_partitioned().await })?; | ||
| let plan = wait_for_future(py, df.create_physical_plan())? | ||
| .map_err(PyDataFusionError::from)?; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
|
||
| let streams = spawn_future(py, async move { | ||
| df_execute_stream_partitioned(plan, task_ctx) | ||
| })?; | ||
| Ok(streams.into_iter().map(PyRecordBatchStream::new).collect()) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is leading me to think we should have some high level documentation, probably in the DataFrame page (or a subpage under it). Some of the things it would be good to do are to explain to a user what kinds of information they could find under these metrics and why that data are not available until after the DataFrame has been executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I think that would be super helpful. I can extend this to include a new user-facing RST page covering things like what metrics are, when they're available, how the physical plan tree maps to operators, etc.