Skip to content

Commit 2d946a7

Browse files
execution_plans: add metrics collector and re-writer
This change introduces new structs / concepts: 1. MetricsCollector Collects metrics from an instance of `StageExec` (which is a task) using a pre-order traversal. It stops at any `ArrowFlightReadExec` nodes and collects child task metrics from them, if there are any. 2. MetricsWrapperExec A new `ExecutionPlan` node which cannot be executed. It wraps a "real" `ExecutionPlan` node and stores metrics. This let's you "override" the `metrics()` method on `ExecutionPlan` nodes. This override applies when displaying the plan as well. This struct is private. 3. Task MetricsRewriter Rewrites a task by wrapping each node in a `MetricsWrapperExec`. Informs #123.
1 parent dcf7ed1 commit 2d946a7

File tree

3 files changed

+554
-0
lines changed

3 files changed

+554
-0
lines changed

src/execution_plans/arrow_flight_read.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@ use crate::config_extension_ext::ContextGrpcMetadata;
33
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
44
use crate::execution_plans::StageExec;
55
use crate::flight_service::DoGet;
6+
use crate::metrics::proto::MetricsSetProto;
67
use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey};
78
use crate::ChannelResolver;
89
use arrow_flight::decode::FlightRecordBatchStream;
910
use arrow_flight::error::FlightError;
1011
use arrow_flight::flight_service_client::FlightServiceClient;
1112
use arrow_flight::Ticket;
13+
use dashmap::DashMap;
1214
use datafusion::arrow::datatypes::SchemaRef;
15+
#[cfg(test)]
16+
use datafusion::arrow::record_batch::RecordBatch;
1317
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
1418
use datafusion::error::DataFusionError;
1519
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -56,6 +60,16 @@ pub struct ArrowFlightReadReadyExec {
5660
/// the properties we advertise for this execution plan
5761
properties: PlanProperties,
5862
pub(crate) stage_num: usize,
63+
/// metrics_collection is used to collect metrics from child tasks. It is empty when an ArrowFlightReadReadyExec is instansiated
64+
/// (deserialized, created via [ArrowFlightReadExec::new_ready] etc). Metrics are populated in this map via [ArrowFlightReadExec::execute].
65+
///
66+
/// An instance may recieve metrics for 0 to N child tasks, where N is the number of tasks in the stage it is reading from.
67+
/// This is because, by convention, the ArrowFlightEndpoint sends metrics for a task to the last ArrowFlightReadExec to read from it, which
68+
/// may or may not be this instance.
69+
pub(super) metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
70+
/// Test-only field to provide record batches directly without network calls
71+
#[cfg(test)]
72+
pub(crate) test_data: Option<Vec<RecordBatch>>,
5973
}
6074

6175
impl ArrowFlightReadExec {
@@ -85,6 +99,9 @@ impl ArrowFlightReadExec {
8599
Self::Ready(ArrowFlightReadReadyExec {
86100
properties,
87101
stage_num,
102+
metrics_collection: Arc::new(DashMap::new()),
103+
#[cfg(test)]
104+
test_data: None,
88105
})
89106
}
90107

@@ -98,6 +115,20 @@ impl ArrowFlightReadExec {
98115
_ => internal_err!("ArrowFlightReadExec is already distributed"),
99116
}
100117
}
118+
119+
/// Test-only method to set record batches for direct execution
120+
#[cfg(test)]
121+
pub fn with_test_data(mut self, batches: Vec<RecordBatch>) -> Self {
122+
match &mut self {
123+
ArrowFlightReadExec::Ready(exec) => {
124+
exec.test_data = Some(batches);
125+
}
126+
ArrowFlightReadExec::Pending(_) => {
127+
panic!("Cannot set test data on pending ArrowFlightReadExec");
128+
}
129+
}
130+
self
131+
}
101132
}
102133

103134
impl DisplayAs for ArrowFlightReadExec {
@@ -151,6 +182,16 @@ impl ExecutionPlan for ArrowFlightReadExec {
151182
return exec_err!("ArrowFlightReadExec is not ready, was the distributed optimization step performed?");
152183
};
153184

185+
// Check for test data first
186+
#[cfg(test)]
187+
if let Some(test_batches) = &self_ready.test_data {
188+
use futures::stream;
189+
return Ok(Box::pin(RecordBatchStreamAdapter::new(
190+
self.schema(),
191+
stream::iter(test_batches.clone().into_iter().map(Ok)),
192+
)));
193+
}
194+
154195
// get the channel manager and current stage from our context
155196
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
156197

0 commit comments

Comments
 (0)