Skip to content

Commit 8b2be08

Browse files
execution_plans: add metrics collector and re-writer
This change introduces new structs / concepts: 1. MetricsCollector Collects metrics from an instance of `StageExec` (which is a task) using a pre-order traversal. It stops at any `ArrowFlightReadExec` nodes and collects child task metrics from them, if there are any. 2. MetricsWrapperExec A new `ExecutionPlan` node which cannot be executed. It wraps a "real" `ExecutionPlan` node and stores metrics. This let's you "override" the `metrics()` method on `ExecutionPlan` nodes. This override applies when displaying the plan as well. This struct is private. 3. Task MetricsRewriter Rewrites a task by wrapping each node in a `MetricsWrapperExec`. Informs #123.
1 parent ae4a4c2 commit 8b2be08

File tree

3 files changed

+520
-0
lines changed

3 files changed

+520
-0
lines changed

src/execution_plans/arrow_flight_read.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ use crate::config_extension_ext::ContextGrpcMetadata;
33
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
44
use crate::execution_plans::StageExec;
55
use crate::flight_service::DoGet;
6+
use crate::metrics::proto::MetricsSetProto;
67
use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey};
78
use crate::ChannelResolver;
89
use arrow_flight::decode::FlightRecordBatchStream;
910
use arrow_flight::error::FlightError;
1011
use arrow_flight::flight_service_client::FlightServiceClient;
1112
use arrow_flight::Ticket;
13+
use dashmap::DashMap;
1214
use datafusion::arrow::datatypes::SchemaRef;
1315
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
1416
use datafusion::error::DataFusionError;
@@ -56,6 +58,13 @@ pub struct ArrowFlightReadReadyExec {
5658
/// the properties we advertise for this execution plan
5759
properties: PlanProperties,
5860
pub(crate) stage_num: usize,
61+
/// metrics_collection is used to collect metrics from child tasks. It is empty when an ArrowFlightReadReadyExec is instansiated
62+
/// (deserialized, created via [ArrowFlightReadExec::new_ready] etc). Metrics are populated in this map via [ArrowFlightReadExec::execute].
63+
///
64+
/// An instance may recieve metrics for 0 to N child tasks, where N is the number of tasks in the stage it is reading from.
65+
/// This is because, by convention, the ArrowFlightEndpoint sends metrics for a task to the last ArrowFlightReadExec to read from it, which
66+
/// may or may not be this instance.
67+
pub(super) metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
5968
}
6069

6170
impl ArrowFlightReadExec {
@@ -85,6 +94,7 @@ impl ArrowFlightReadExec {
8594
Self::Ready(ArrowFlightReadReadyExec {
8695
properties,
8796
stage_num,
97+
metrics_collection: Arc::new(DashMap::new()),
8898
})
8999
}
90100

0 commit comments

Comments
 (0)