Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/execution_plans/arrow_flight_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::config_extension_ext::ContextGrpcMetadata;
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
use crate::execution_plans::StageExec;
use crate::flight_service::{DoGet, StageKey};
use crate::protobuf::{proto_from_stage, DistributedCodec};
use crate::flight_service::DoGet;
use crate::metrics::proto::MetricsSetProto;
use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey};
use crate::ChannelResolver;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::Ticket;
use dashmap::DashMap;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
use datafusion::error::DataFusionError;
Expand Down Expand Up @@ -56,6 +58,13 @@ pub struct ArrowFlightReadReadyExec {
/// the properties we advertise for this execution plan
properties: PlanProperties,
pub(crate) stage_num: usize,
/// metrics_collection is used to collect metrics from child tasks. It is empty when an ArrowFlightReadReadyExec is instansiated
/// (deserialized, created via [ArrowFlightReadExec::new_ready] etc). Metrics are populated in this map via [ArrowFlightReadExec::execute].
///
/// An instance may recieve metrics for 0 to N child tasks, where N is the number of tasks in the stage it is reading from.
/// This is because, by convention, the ArrowFlightEndpoint sends metrics for a task to the last ArrowFlightReadExec to read from it, which
/// may or may not be this instance.
pub(super) metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
}

impl ArrowFlightReadExec {
Expand Down Expand Up @@ -85,6 +94,7 @@ impl ArrowFlightReadExec {
Self::Ready(ArrowFlightReadReadyExec {
properties,
stage_num,
metrics_collection: Arc::new(DashMap::new()),
})
}

Expand Down
Loading