Skip to content

Commit e428e7f

Browse files
execution_plans: add metrics collector and re-writer (#144)
* protobuf: move StageKey to protobuf module The StageKey will be used by various modules due to metrics collection for EXPLAIN ANALYZE. It will be more organized to import from the protobuf module than flight_service, which is its old module. * test_utils: add in memory channel resolver This change copies the in-memory channel resolver from the `examples` folder to `test_utils` so it is possible to execute distributed queries easily in unit tests without the overhead of network communication. * test_utils: add utility to create test parquet tables from RecordBatch This change adds a module `session_context` to `test_utils` which allows you to - create temp parquet files from RecordBatch - register this data in the `SessionContext` under a table name This is handy for unit tests where you typically want to execute a distributed SQL query on some small hard-coded `RecordBatch` data * execution_plans: add metrics collector and re-writer This change introduces new structs / concepts: 1. MetricsCollector Collects metrics from an instance of `StageExec` (which is a task) using a pre-order traversal. It stops at any `ArrowFlightReadExec` nodes and collects child task metrics from them, if there are any. 2. MetricsWrapperExec A new `ExecutionPlan` node which cannot be executed. It wraps a "real" `ExecutionPlan` node and stores metrics. This let's you "override" the `metrics()` method on `ExecutionPlan` nodes. This override applies when displaying the plan as well. This struct is private. 3. Task MetricsRewriter Rewrites a task by wrapping each node in a `MetricsWrapperExec`. Informs #123. * misc: remove println in test * misc: fix lint in StageExec
1 parent f8de1bd commit e428e7f

File tree

13 files changed

+734
-23
lines changed

13 files changed

+734
-23
lines changed

src/execution_plans/arrow_flight_read.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::config_extension_ext::ContextGrpcMetadata;
33
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
44
use crate::execution_plans::StageExec;
5-
use crate::flight_service::{DoGet, StageKey};
6-
use crate::protobuf::{proto_from_stage, DistributedCodec};
5+
use crate::flight_service::DoGet;
6+
use crate::metrics::proto::MetricsSetProto;
7+
use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey};
78
use crate::ChannelResolver;
89
use arrow_flight::decode::FlightRecordBatchStream;
910
use arrow_flight::error::FlightError;
1011
use arrow_flight::flight_service_client::FlightServiceClient;
1112
use arrow_flight::Ticket;
13+
use dashmap::DashMap;
1214
use datafusion::arrow::datatypes::SchemaRef;
1315
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
1416
use datafusion::error::DataFusionError;
@@ -56,6 +58,13 @@ pub struct ArrowFlightReadReadyExec {
5658
/// the properties we advertise for this execution plan
5759
properties: PlanProperties,
5860
pub(crate) stage_num: usize,
61+
/// metrics_collection is used to collect metrics from child tasks. It is empty when an ArrowFlightReadReadyExec is instansiated
62+
/// (deserialized, created via [ArrowFlightReadExec::new_ready] etc). Metrics are populated in this map via [ArrowFlightReadExec::execute].
63+
///
64+
/// An instance may recieve metrics for 0 to N child tasks, where N is the number of tasks in the stage it is reading from.
65+
/// This is because, by convention, the ArrowFlightEndpoint sends metrics for a task to the last ArrowFlightReadExec to read from it, which
66+
/// may or may not be this instance.
67+
pub(super) metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
5968
}
6069

6170
impl ArrowFlightReadExec {
@@ -85,6 +94,7 @@ impl ArrowFlightReadExec {
8594
Self::Ready(ArrowFlightReadReadyExec {
8695
properties,
8796
stage_num,
97+
metrics_collection: Arc::new(DashMap::new()),
8898
})
8999
}
90100

0 commit comments

Comments
 (0)