Skip to content

Commit 9eea08e

Browse files
refactor modules and files
1 parent 0200ce0 commit 9eea08e

File tree

9 files changed

+674
-579
lines changed

9 files changed

+674
-579
lines changed

src/execution_plans/metrics.rs

Lines changed: 3 additions & 571 deletions
Large diffs are not rendered by default.

src/execution_plans/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
mod metrics;
2-
mod metrics_collecting_stream;
32
mod network_coalesce;
43
mod network_shuffle;
54
mod partition_isolator;
65
mod stage;
76

8-
pub use metrics::collect_and_create_metrics_flight_data;
7+
pub use metrics::MetricsWrapperExec;
98
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
109
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
1110
pub use partition_isolator::PartitionIsolatorExec;

src/execution_plans/network_coalesce.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver;
33
use crate::common::scale_partitioning_props;
44
use crate::config_extension_ext::ContextGrpcMetadata;
55
use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_err};
6-
use crate::execution_plans::metrics_collecting_stream::MetricsCollectingStream;
76
use crate::execution_plans::{DistributedTaskContext, StageExec};
87
use crate::flight_service::DoGet;
8+
use crate::metrics::MetricsCollectingStream;
99
use crate::metrics::proto::MetricsSetProto;
1010
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
1111
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};

src/execution_plans/network_shuffle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver;
33
use crate::common::scale_partitioning;
44
use crate::config_extension_ext::ContextGrpcMetadata;
55
use crate::distributed_physical_optimizer_rule::NetworkBoundary;
6-
use crate::execution_plans::metrics_collecting_stream::MetricsCollectingStream;
76
use crate::execution_plans::{DistributedTaskContext, StageExec};
87
use crate::flight_service::DoGet;
8+
use crate::metrics::MetricsCollectingStream;
99
use crate::metrics::proto::MetricsSetProto;
1010
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
1111
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};

src/flight_service/do_get.rs

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
11
use crate::config_extension_ext::ContextGrpcMetadata;
2-
use crate::execution_plans::{
3-
DistributedTaskContext, StageExec, collect_and_create_metrics_flight_data,
4-
};
2+
use crate::execution_plans::{DistributedTaskContext, StageExec};
53
use crate::flight_service::service::ArrowFlightEndpoint;
64
use crate::flight_service::session_builder::DistributedSessionBuilderContext;
75
use crate::flight_service::trailing_flight_data_stream::TrailingFlightDataStream;
6+
use crate::metrics::TaskMetricsCollector;
7+
use crate::metrics::proto::df_metrics_set_to_proto;
88
use crate::protobuf::{
9-
DistributedCodec, StageExecProto, StageKey, datafusion_error_to_tonic_status, stage_from_proto,
9+
AppMetadata, DistributedCodec, FlightAppMetadata, MetricsCollection, StageExecProto, StageKey,
10+
TaskMetrics, datafusion_error_to_tonic_status, stage_from_proto,
1011
};
12+
use arrow::array::RecordBatch;
13+
use arrow::datatypes::SchemaRef;
14+
use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
15+
use arrow_flight::FlightData;
1116
use arrow_flight::Ticket;
1217
use arrow_flight::encode::FlightDataEncoderBuilder;
1318
use arrow_flight::error::FlightError;
1419
use arrow_flight::flight_service_server::FlightService;
1520
use datafusion::common::exec_datafusion_err;
1621
use datafusion::execution::SendableRecordBatchStream;
1722
use futures::TryStreamExt;
23+
use futures::{Stream, stream};
1824
use prost::Message;
1925
use std::sync::Arc;
2026
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -186,6 +192,77 @@ fn flight_stream_from_record_batch_stream(
186192
})))
187193
}
188194

195+
// Collects metrics from the provided stage and encodes it into a stream of flight data using
196+
// the schema of the stage.
197+
pub fn collect_and_create_metrics_flight_data(
198+
stage_key: StageKey,
199+
stage: Arc<StageExec>,
200+
) -> Result<impl Stream<Item = Result<FlightData, FlightError>> + Send + 'static, FlightError> {
201+
// Get the metrics for the task executed on this worker. Separately, collect metrics for child tasks.
202+
let mut result = TaskMetricsCollector::new()
203+
.collect(stage.plan.clone())
204+
.map_err(|err| FlightError::ProtocolError(err.to_string()))?;
205+
206+
// Add the metrics for this task into the collection of task metrics.
207+
// Skip any metrics that can't be converted to proto (unsupported types)
208+
let proto_task_metrics = result
209+
.task_metrics
210+
.iter()
211+
.map(|metrics| {
212+
df_metrics_set_to_proto(metrics)
213+
.map_err(|err| FlightError::ProtocolError(err.to_string()))
214+
})
215+
.collect::<Result<Vec<_>, _>>()?;
216+
result
217+
.child_task_metrics
218+
.insert(stage_key, proto_task_metrics);
219+
220+
// Serialize the metrics for all tasks.
221+
let mut task_metrics_set = vec![];
222+
for (stage_key, metrics) in result.child_task_metrics.into_iter() {
223+
task_metrics_set.push(TaskMetrics {
224+
stage_key: Some(stage_key),
225+
metrics,
226+
});
227+
}
228+
229+
let flight_app_metadata = FlightAppMetadata {
230+
content: Some(AppMetadata::MetricsCollection(MetricsCollection {
231+
tasks: task_metrics_set,
232+
})),
233+
};
234+
235+
let metrics_flight_data =
236+
empty_flight_data_with_app_metadata(flight_app_metadata, stage.plan.schema())?;
237+
Ok(Box::pin(stream::once(
238+
async move { Ok(metrics_flight_data) },
239+
)))
240+
}
241+
242+
/// Creates a FlightData with the given app_metadata and empty RecordBatch using the provided schema.
243+
/// We don't use [arrow_flight::encode::FlightDataEncoder] (and by extension, the [arrow_flight::encode::FlightDataEncoderBuilder])
244+
/// since they skip messages with empty RecordBatch data.
245+
pub fn empty_flight_data_with_app_metadata(
246+
metadata: FlightAppMetadata,
247+
schema: SchemaRef,
248+
) -> Result<FlightData, FlightError> {
249+
let mut buf = vec![];
250+
metadata
251+
.encode(&mut buf)
252+
.map_err(|err| FlightError::ProtocolError(err.to_string()))?;
253+
254+
let empty_batch = RecordBatch::new_empty(schema);
255+
let options = IpcWriteOptions::default();
256+
let data_gen = IpcDataGenerator::default();
257+
let mut dictionary_tracker = DictionaryTracker::new(true);
258+
let (_, encoded_data) = data_gen
259+
.encoded_batch(&empty_batch, &mut dictionary_tracker, &options)
260+
.map_err(|e| {
261+
FlightError::ProtocolError(format!("Failed to create empty batch FlightData: {e}"))
262+
})?;
263+
Ok(FlightData::from(encoded_data).with_app_metadata(buf))
264+
}
265+
189266
#[cfg(test)]
190267
mod tests {
191268
use super::*;

src/metrics/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
1+
mod metrics_collecting_stream;
12
pub(crate) mod proto;
3+
mod task_metrics_collector;
4+
mod task_metrics_rewriter;
5+
pub(crate) use metrics_collecting_stream::MetricsCollectingStream;
6+
pub(crate) use task_metrics_collector::TaskMetricsCollector;

0 commit comments

Comments
 (0)