Skip to content

Conversation

@jayshrivastava
Copy link
Collaborator

@jayshrivastava jayshrivastava commented Sep 27, 2025

flight_service: emit metrics from ArrowFlightEndpoint

This change updates the ArrowFlightEndpoint to collect metrics and emit them. When the last partition
in a task is finished, the ArrowFlightEndpoint collects metrics and emits them via the
TrailingFlightDataStream.

Previously, we would determine if a partition is finished when the request first hit the endpoint. Now,
we do it on stream completition. This is crutial for metrics collection because we need to know that
the stream is exhausted, meaning that there's no data flowing in the plan and metrics are not actively
being updated.

Since the ArrowFlightEndpoint now emits metrics and NetworkBoundary plan nodes collect metrics,
all coordinating StageExecs will now have the full collection of metrics for all tasks. This commit
adds integration style tests that assert that the coordinator is recieving the full set of metrics.

Finally, this change refactors and unskips some old metrics tests. These tests were skipped because
the plans would change. Now, we use test utils to count the number of nodes etc to make these tests
more resilient to cahnges.

Follow up work

  • Only collect metrics if a configuration is set in the SessionContext, removing extra overhead
  • Display metrics in the plan using EXPLAIN (ANALYZE) - consider using sqllogictest or similar
    to test the output

@jayshrivastava jayshrivastava force-pushed the js/metrics-protocol-integration branch 2 times, most recently from 8d54d00 to cbfa6da Compare September 29, 2025 18:14
@jayshrivastava jayshrivastava marked this pull request as ready for review September 29, 2025 19:00
@jayshrivastava jayshrivastava force-pushed the js/metrics-protocol-integration branch from cbfa6da to 543a085 Compare September 29, 2025 19:03
Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! 🚀 this seems to go in the right direction.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thought unrelated to this PR:

this src/execution_plans module is dedicated for DataFusion ExecutionPlan implementations. If a file inside here is not providing an ExecutionPlan implementation, it should probably not live here along with other ExecutionPlan implementations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me. I think it's okay to address in this PR. I pushed a commit 'refactor modules and files' to ensure that

  • all metrics related code is in the metrics module
  • only the MetricsWrapperExec lives in execution_plans

/// sends metrics for a task to the last NetworkCoalesceExec to read from it, which may or may
/// not be this instance.
pub(crate) metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
pub(crate) child_task_metrics: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
Copy link
Collaborator

@gabotechs gabotechs Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It would be consistent with the naming in other parts of the project and call this input_task_metrics

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in latest commit

Comment on lines 143 to 144
// Creates a tonic response from a stream of record batches. Handles
// - RecordBatch to flight conversion partition tracking, stage eviction, and trailing metrics.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Rust, documentation for functions is declared with triple ///. A very immediate benefit is that IDEs are smart enough to display you the docs upon hovering usages.

Comment on lines 147 to 148
stage: Arc<StageExec>,
stage_data: TaskData,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we are unnecessarily passing Arc<StageExec> twice. TaskData already contains Arc<StageExec>

Comment on lines 324 to 337

/// metrics_collection is used to collect metrics from child tasks. It is empty when a
/// [NetworkBoundary] is instantiated (deserialized, created via new() etc...).
/// Metrics are populated by executing() the [NetworkBoundary]. It's expected that the
/// collection is complete after the [NetworkBoundary] has been executed. It is undefined
/// what this returns during execution.
///
/// An instance may receive metrics for 0 to N child tasks, where N is the number of tasks
/// in the stage it is reading from. This is because, by convention, the ArrowFlightEndpoint
/// sends metrics for a task to the last [NetworkBoundary] to read from it, which may or may
/// not be this instance.
fn metrics_collection(&self) -> Option<Arc<DashMap<StageKey, Vec<MetricsSetProto>>>> {
None
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to not have this method here.

The only place where this is used is in:

let metrics_collection =
if let Some(node) = plan.as_any().downcast_ref::<NetworkShuffleExec>() {
node.metrics_collection()
.map(Some)
.ok_or(DataFusionError::Internal(
"could not collect metrics from NetworkShuffleExec".to_string(),
))
} else if let Some(node) = plan.as_any().downcast_ref::<NetworkCoalesceExec>() {
node.metrics_collection()
.map(Some)
.ok_or(DataFusionError::Internal(
"could not collect metrics from NetworkCoalesceExec".to_string(),
))
} else {
Ok(None)
}?;

And there, we are not making use of the NetworkBoundary trait, so this could perfectly just be normal methods in both NetworkShuffleExec and NetworkCoalesceExec.

It'd be nice to scope as much as possible the functionality of NetworkBoundary, leaving it exclusively dedicated to distributed planning, otherwise, unrelated features can creep in over time, bloating the trait and making the SOLID principles gods unhappy because we are not complying with the I

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

I do think that network boundaries should be responsible for metrics collection, unfortunately, we don't gain the benefit of putting metrics_collection() in the trait here because we can't cast from Arc<dyn ExecutionPlan> to NetworkBoundary sadly. So, I've dropped the commit distributed_physical_optimizer_rule: move metrics_collection to NetworkBoundary trait

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the functions in this files are not really test utils used in different places of the codebase, they are just used only in one place.

I'd advocate towards:

  • If a function is used only once, just place it where it's used
  • If a function is used twice or more, promote it to a "utils"/"helpers"/"common" module for reuse

Otherwise, "utils"/"helpers"/"common" modules will grow unbounded with functions that are not really commonly used.

Comment on lines 42 to 43
#[allow(dead_code)]
pub struct MetricsCollectorResult {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is no longer dead code right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed all unnecessary dead code markers.

.collect::<Result<Vec<MetricsSetProto>, FlightError>>()?;
result
.child_task_metrics
.insert(stage_key.clone(), proto_task_metrics.clone());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two clones seem unnecessary:

Suggested change
.insert(stage_key.clone(), proto_task_metrics.clone());
.insert(stage_key, proto_task_metrics);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Good catch :)

df_metrics_set_to_proto(metrics)
.map_err(|err| FlightError::ProtocolError(err.to_string()))
})
.collect::<Result<Vec<MetricsSetProto>, FlightError>>()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you prefer, you can even let Rust infer the types:

Suggested change
.collect::<Result<Vec<MetricsSetProto>, FlightError>>()?;
.collect::<Result<Vec<_>, _>>()?;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

#[cfg(test)]
mod tests {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like several structs in this file are unused (just used in tests):

  • TaskMetricsRewriter is unused in production code, it's just used in the tests, so it probably should move there
  • same for MetricsWrapperExec, it's unused in production code, so a better place for it could be under mod tests {}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rewriter and wrapper will both be used in production soon when I implement the display portion of explain analyze, so I think it's okay to leave them.

@jayshrivastava
Copy link
Collaborator Author

TYFR @gabotechs :) I've addressed the comments

@jayshrivastava jayshrivastava force-pushed the js/metrics-protocol-integration branch 2 times, most recently from 6257e29 to 4453842 Compare September 30, 2025 14:53
Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work 💯

/// Creates a FlightData with the given app_metadata and empty RecordBatch using the provided schema.
/// We don't use [arrow_flight::encode::FlightDataEncoder] (and by extension, the [arrow_flight::encode::FlightDataEncoderBuilder])
/// since they skip messages with empty RecordBatch data.
pub fn empty_flight_data_with_app_metadata(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, only used once here, so it can be private:

Suggested change
pub fn empty_flight_data_with_app_metadata(
fn empty_flight_data_with_app_metadata(

}

#[tokio::test]
async fn test_metrics_collection_e2e_1() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! any opinions on adding an integration test that does, for example, a TPCH query, and expects that all nodes have collected metrics?

It seems like this tests already provide good coverage, but giving ideas for if you want to take it one step further.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Ill add these when I can display the full explain output and assert the contents of the output. I'll probably need some way to obfuscate / hide the numbers since those are nondeterministic.

Comment on lines +15 to +27
/// TaskMetricsRewriter is used to enrich a task with metrics by re-writing the plan using [MetricsWrapperExec] nodes.
///
/// Ex. for a plan with the form
/// AggregateExec
/// └── ProjectionExec
/// └── NetworkShuffleExec
///
/// the task will be rewritten as
///
/// MetricsWrapperExec (wrapped: AggregateExec)
/// └── MetricsWrapperExec (wrapped: ProjectionExec)
/// └── NetworkShuffleExec
/// (Note that the NetworkShuffleExec node is not wrapped)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is not specific to this PR, but here's a question just for my understanding:

How is it that we cannot rely on the existing mechanism in ExecutionPlans for storing metrics?

https://github.com/apache/datafusion/blob/986cfcde0ee2bb2cd2e8cc684a497896b278a2d9/datafusion/physical-plan/src/execution_plan.rs#L455-L457

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we don't is because these metrics are populated during execution (cannot be written directly) and cannot be serialized. So, if a remote plan node runs, there's no way to have a copy of the same plan node return those metrics by calling metrics(). The workaround is to wrap the nodes.

It would be nice if we could use a more native method to inject metrics though if the ExecutionPlan interface were to allow it.

This change updates the ArrowFlightEndpoint to collect metrics and emit them. When the last partition
in a task is finished, the ArrowFlightEndpoint collects metrics and emits them via the
TrailingFlightDataStream.

Previously, we would determine if a partition is finished when the request first hit the endpoint. Now,
we do it on stream completition. This is crutial for metrics collection because we need to know that
the stream is exhausted, meaning that there's no data flowing in the plan and metrics are not actively
being updated.

Since the ArrowFlightEndpoint now emits metrics and NetworkBoundary plan nodes collect metrics,
all coordinating StageExecs will now have the full collection of metrics for all tasks. This commit
adds integration style tests that assert that the coordinator is recieving the full set of metrics.

Finally, this change refactors and unskips some old metrics tests. These tests were skipped because
the plans would change. Now, we use test utils to count the number of nodes etc to make these tests
more resilient to cahnges.

Follow up work
- Only collect metrics if a configuration is set in the SessionContext, removing extra overhead
- Display metrics in the plan using EXPLAIN (ANALYZE) - consider using sqllogictest or similar
  to test the output

Closes: #94
Informs: #123
@jayshrivastava jayshrivastava force-pushed the js/metrics-protocol-integration branch from 9dab95c to 2163a73 Compare September 30, 2025 16:30
@jayshrivastava jayshrivastava merged commit 3fe4b08 into main Sep 30, 2025
4 checks passed
@jayshrivastava jayshrivastava deleted the js/metrics-protocol-integration branch September 30, 2025 16:38
@gabotechs gabotechs mentioned this pull request Oct 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants