Skip to content

Conversation

@jayshrivastava
Copy link
Collaborator

@jayshrivastava jayshrivastava commented Sep 18, 2025

protobuf: move StageKey to protobuf module

The StageKey will be used by various modules due to metrics collection for EXPLAIN ANALYZE. It will be more organized to import from the protobuf module than flight_service, which is its old module.


test_utils: add utility to create test parquet tables from RecordBatch

This change adds a module session_context to test_utils which allows you to

  • create temp parquet files from RecordBatch
  • register this data in the SessionContext under a table name

This is handy for unit tests where you typically want to execute a distributed SQL
query on some small hard-coded RecordBatch data


test_utils: add in memory channel resolver

This change copies the in-memory channel resolver from the examples folder to test_utils
so it is possible to execute distributed queries easily in unit tests without the overhead
of network communication.


execution_plans: add metrics collector and re-writer

This change introduces new structs / concepts:

  1. MetricsCollector

Collects metrics from an instance of StageExec (which is a task) using a
pre-order traversal. It stops at any ArrowFlightReadExec nodes and collects
child task metrics from them, if there are any.

  1. MetricsWrapperExec

A new ExecutionPlan node which cannot be executed. It wraps a "real" ExecutionPlan
node and stores metrics. This let's you "override" the metrics() method on
ExecutionPlan nodes. This override applies when displaying the plan as well.

This struct is private.

  1. Task MetricsRewriter

Rewrites a task by wrapping each node in a MetricsWrapperExec.

Informs #123.

The StageKey will be used by various modules due to metrics collection for EXPLAIN ANALYZE. It will be more organized to import from the protobuf module than flight_service, which is its old module.
@jayshrivastava jayshrivastava changed the title Js/metrics traversals execution_plans: add metrics collector and re-writer Sep 18, 2025
@jayshrivastava jayshrivastava marked this pull request as ready for review September 18, 2025 20:32
Copy link
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great.

}

impl TaskMetricsCollector {
#[allow(dead_code)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this here because the function is not used yet and will be removed in the following PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

/// MetricsWrapperExec (wrapped: AggregateExec)
/// └── MetricsWrapperExec (wrapped: ProjectionExec)
/// └── ArrowFlightReadExec
/// (Note that the ArrowFlightReadExec node is not wrapped)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice comment

/// A transparent wrapper that delegates all execution to its child but returns custom metrics. This node is invisible during display.
/// The structure of a plan tree is closely tied to the [TaskMetricsRewriter].
struct MetricsWrapperExec {
wrapped: Arc<dyn ExecutionPlan>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should we name this inner?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// Create a plan with ArrowFlightReadExec leaf with test data
let arrow_flight_exec = ArrowFlightReadExec::new_ready(
Partitioning::RoundRobinBatch(1),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we plan to remove partition round robin in distributed mode, can we not used it in our tests? Use hash repartition instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏽

/// └── ProjectionExec
/// └── ArrowFlightReadExec (has metrics from stage 1)
/// Stage 1
/// └── EmptyExec
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a valid plan? Should we have a RepartitionExec in Stage1 to be able to have Stage2?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw you did add RoundRobin repartition when you create ArrowFlightReadExec below. Would that be the one which will become RepartitionExec here or we need to add explicit RepartitionExec? I know it does not matter in this test but we do not want to have non-valid plans in our test to avoid future confusion

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. I made an invalid plan. I'll try to generate a legitimate one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

CoalesceBatchesExec: target_batch_size=1024, metrics=[output_rows=11, elapsed_compute=11ns, start_timestamp=2025-09-18 13:00:11 UTC, end_timestamp=2025-09-18 13:00:12 UTC]
ProjectionExec: expr=[id@0 as id, name@1 as name], metrics=[output_rows=12, elapsed_compute=12ns, start_timestamp=2025-09-18 13:00:12 UTC, end_timestamp=2025-09-18 13:00:13 UTC]
ArrowFlightReadExec, metrics=[]
";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice metric tests with the seed to provide distinct metric value.

  • Can we have the r"LocalLimitExec: .... in a new line so we can read it easier?
  • Was the EmptyExec removed? For unit test, it is good enough. Do you plan to add more integration tests in following PRs? I think we can use sqllogictest or the like for those so we can test many queries without a lot of mock functions if that is easier for you. Actually, you can use this ticket for adding more those tests later. We want mutiple stages, multiple tasks and simple plus complicated queries to make sure all metrics are capture

Copy link
Collaborator Author

@jayshrivastava jayshrivastava Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the EmptyExec removed?

When a re-write a plan to enrich it with metrics, I'm only re-writing the task (StageExec::plan). So here, I only assert that has metrics.

For unit test, it is good enough. Do you plan to add more integration tests in following PRs?

Yes. I will add integration tests once we can collect metrics across ArrowFlightReadExec boundaries. Right now, we cannot, so I only test one stage.

I think we can use sqllogictest or the like for those so we can test many queries without a lot of mock functions if that is easier for you. Actually, you can use this ticket for adding more those tests later. We want mutiple stages, multiple tasks and simple plus complicated queries to make sure all metrics are capture

Agreed. I plan to include such tests for sure 👍🏽

This change copies the in-memory channel resolver from the `examples` folder to `test_utils`
so it is possible to execute distributed queries easily in unit tests without the overhead
of network communication.
@jayshrivastava jayshrivastava force-pushed the js/metrics-traversals branch 2 times, most recently from 444b082 to cd41e02 Compare September 19, 2025 19:16
This change adds a module `session_context` to `test_utils` which allows you to
- create temp parquet files from RecordBatch
- register this data in the `SessionContext` under a table name

This is handy for unit tests where you typically want to execute a distributed SQL
query on some small hard-coded `RecordBatch` data
This change introduces new structs / concepts:

1. MetricsCollector

Collects metrics from an instance of `StageExec` (which is a task) using a
pre-order traversal. It stops at any `ArrowFlightReadExec` nodes and collects
child task metrics from them, if there are any.

2. MetricsWrapperExec

A new `ExecutionPlan` node which cannot be executed. It wraps a "real" `ExecutionPlan`
node and stores metrics. This let's you "override" the `metrics()` method on
`ExecutionPlan` nodes. This override applies when displaying the plan as well.

This struct is private.

3. Task MetricsRewriter

Rewrites a task by wrapping each node in a `MetricsWrapperExec`.

Informs #123.
@jayshrivastava jayshrivastava merged commit e428e7f into main Sep 19, 2025
4 checks passed
@jayshrivastava jayshrivastava deleted the js/metrics-traversals branch September 19, 2025 19:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants