Skip to content

Commit c5ac6cb

Browse files
refactor modules and files
1 parent f8fbbe6 commit c5ac6cb

File tree

9 files changed

+674
-579
lines changed

9 files changed

+674
-579
lines changed

src/execution_plans/metrics.rs

Lines changed: 3 additions & 571 deletions
Large diffs are not rendered by default.

src/execution_plans/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
mod metrics;
2-
mod metrics_collecting_stream;
32
mod network_coalesce;
43
mod network_shuffle;
54
mod partition_isolator;
65
mod stage;
76

8-
pub use metrics::collect_and_create_metrics_flight_data;
7+
pub use metrics::MetricsWrapperExec;
98
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
109
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
1110
pub use partition_isolator::PartitionIsolatorExec;

src/execution_plans/network_coalesce.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver;
33
use crate::common::scale_partitioning_props;
44
use crate::config_extension_ext::ContextGrpcMetadata;
55
use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_err};
6-
use crate::execution_plans::metrics_collecting_stream::MetricsCollectingStream;
76
use crate::execution_plans::{DistributedTaskContext, StageExec};
87
use crate::flight_service::DoGet;
8+
use crate::metrics::MetricsCollectingStream;
99
use crate::metrics::proto::MetricsSetProto;
1010
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
1111
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};

src/execution_plans/network_shuffle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver;
33
use crate::common::scale_partitioning;
44
use crate::config_extension_ext::ContextGrpcMetadata;
55
use crate::distributed_physical_optimizer_rule::NetworkBoundary;
6-
use crate::execution_plans::metrics_collecting_stream::MetricsCollectingStream;
76
use crate::execution_plans::{DistributedTaskContext, StageExec};
87
use crate::flight_service::DoGet;
8+
use crate::metrics::MetricsCollectingStream;
99
use crate::metrics::proto::MetricsSetProto;
1010
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
1111
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};

src/flight_service/do_get.rs

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
use crate::common::with_callback;
22
use crate::config_extension_ext::ContextGrpcMetadata;
3-
use crate::execution_plans::{
4-
DistributedTaskContext, StageExec, collect_and_create_metrics_flight_data,
5-
};
3+
use crate::execution_plans::{DistributedTaskContext, StageExec};
64
use crate::flight_service::service::ArrowFlightEndpoint;
75
use crate::flight_service::session_builder::DistributedSessionBuilderContext;
86
use crate::flight_service::trailing_flight_data_stream::TrailingFlightDataStream;
7+
use crate::metrics::TaskMetricsCollector;
8+
use crate::metrics::proto::df_metrics_set_to_proto;
99
use crate::protobuf::{
10-
DistributedCodec, StageExecProto, StageKey, datafusion_error_to_tonic_status, stage_from_proto,
10+
AppMetadata, DistributedCodec, FlightAppMetadata, MetricsCollection, StageExecProto, StageKey,
11+
TaskMetrics, datafusion_error_to_tonic_status, stage_from_proto,
1112
};
13+
use arrow::array::RecordBatch;
14+
use arrow::datatypes::SchemaRef;
15+
use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
16+
use arrow_flight::FlightData;
1217
use arrow_flight::Ticket;
1318
use arrow_flight::encode::FlightDataEncoderBuilder;
1419
use arrow_flight::error::FlightError;
@@ -17,6 +22,7 @@ use datafusion::common::exec_datafusion_err;
1722
use datafusion::execution::SendableRecordBatchStream;
1823
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1924
use futures::TryStreamExt;
25+
use futures::{Stream, stream};
2026
use prost::Message;
2127
use std::sync::Arc;
2228
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -203,6 +209,77 @@ fn flight_stream_from_record_batch_stream(
203209
})))
204210
}
205211

212+
// Collects metrics from the provided stage and encodes it into a stream of flight data using
213+
// the schema of the stage.
214+
pub fn collect_and_create_metrics_flight_data(
215+
stage_key: StageKey,
216+
stage: Arc<StageExec>,
217+
) -> Result<impl Stream<Item = Result<FlightData, FlightError>> + Send + 'static, FlightError> {
218+
// Get the metrics for the task executed on this worker. Separately, collect metrics for child tasks.
219+
let mut result = TaskMetricsCollector::new()
220+
.collect(stage.plan.clone())
221+
.map_err(|err| FlightError::ProtocolError(err.to_string()))?;
222+
223+
// Add the metrics for this task into the collection of task metrics.
224+
// Skip any metrics that can't be converted to proto (unsupported types)
225+
let proto_task_metrics = result
226+
.task_metrics
227+
.iter()
228+
.map(|metrics| {
229+
df_metrics_set_to_proto(metrics)
230+
.map_err(|err| FlightError::ProtocolError(err.to_string()))
231+
})
232+
.collect::<Result<Vec<_>, _>>()?;
233+
result
234+
.child_task_metrics
235+
.insert(stage_key, proto_task_metrics);
236+
237+
// Serialize the metrics for all tasks.
238+
let mut task_metrics_set = vec![];
239+
for (stage_key, metrics) in result.child_task_metrics.into_iter() {
240+
task_metrics_set.push(TaskMetrics {
241+
stage_key: Some(stage_key),
242+
metrics,
243+
});
244+
}
245+
246+
let flight_app_metadata = FlightAppMetadata {
247+
content: Some(AppMetadata::MetricsCollection(MetricsCollection {
248+
tasks: task_metrics_set,
249+
})),
250+
};
251+
252+
let metrics_flight_data =
253+
empty_flight_data_with_app_metadata(flight_app_metadata, stage.plan.schema())?;
254+
Ok(Box::pin(stream::once(
255+
async move { Ok(metrics_flight_data) },
256+
)))
257+
}
258+
259+
/// Creates a FlightData with the given app_metadata and empty RecordBatch using the provided schema.
260+
/// We don't use [arrow_flight::encode::FlightDataEncoder] (and by extension, the [arrow_flight::encode::FlightDataEncoderBuilder])
261+
/// since they skip messages with empty RecordBatch data.
262+
pub fn empty_flight_data_with_app_metadata(
263+
metadata: FlightAppMetadata,
264+
schema: SchemaRef,
265+
) -> Result<FlightData, FlightError> {
266+
let mut buf = vec![];
267+
metadata
268+
.encode(&mut buf)
269+
.map_err(|err| FlightError::ProtocolError(err.to_string()))?;
270+
271+
let empty_batch = RecordBatch::new_empty(schema);
272+
let options = IpcWriteOptions::default();
273+
let data_gen = IpcDataGenerator::default();
274+
let mut dictionary_tracker = DictionaryTracker::new(true);
275+
let (_, encoded_data) = data_gen
276+
.encoded_batch(&empty_batch, &mut dictionary_tracker, &options)
277+
.map_err(|e| {
278+
FlightError::ProtocolError(format!("Failed to create empty batch FlightData: {e}"))
279+
})?;
280+
Ok(FlightData::from(encoded_data).with_app_metadata(buf))
281+
}
282+
206283
#[cfg(test)]
207284
mod tests {
208285
use super::*;

src/metrics/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
1+
mod metrics_collecting_stream;
12
pub(crate) mod proto;
3+
mod task_metrics_collector;
4+
mod task_metrics_rewriter;
5+
pub(crate) use metrics_collecting_stream::MetricsCollectingStream;
6+
pub(crate) use task_metrics_collector::TaskMetricsCollector;

0 commit comments

Comments
 (0)