11use crate :: common:: with_callback;
22use crate :: config_extension_ext:: ContextGrpcMetadata ;
3- use crate :: execution_plans:: {
4- DistributedTaskContext , StageExec , collect_and_create_metrics_flight_data,
5- } ;
3+ use crate :: execution_plans:: { DistributedTaskContext , StageExec } ;
64use crate :: flight_service:: service:: ArrowFlightEndpoint ;
75use crate :: flight_service:: session_builder:: DistributedSessionBuilderContext ;
86use crate :: flight_service:: trailing_flight_data_stream:: TrailingFlightDataStream ;
7+ use crate :: metrics:: TaskMetricsCollector ;
8+ use crate :: metrics:: proto:: df_metrics_set_to_proto;
99use crate :: protobuf:: {
10- DistributedCodec , StageKey , datafusion_error_to_tonic_status , stage_from_proto ,
10+ datafusion_error_to_tonic_status , stage_from_proto , AppMetadata , DistributedCodec , FlightAppMetadata , MetricsCollection , StageKey , TaskMetrics
1111} ;
12+ use arrow:: array:: RecordBatch ;
13+ use arrow:: datatypes:: SchemaRef ;
14+ use arrow:: ipc:: writer:: { DictionaryTracker , IpcDataGenerator , IpcWriteOptions } ;
15+ use arrow_flight:: FlightData ;
1216use arrow_flight:: Ticket ;
1317use arrow_flight:: encode:: FlightDataEncoderBuilder ;
1418use arrow_flight:: error:: FlightError ;
@@ -18,6 +22,7 @@ use datafusion::common::exec_datafusion_err;
1822use datafusion:: execution:: SendableRecordBatchStream ;
1923use datafusion:: physical_plan:: stream:: RecordBatchStreamAdapter ;
2024use futures:: TryStreamExt ;
25+ use futures:: { Stream , stream} ;
2126use prost:: Message ;
2227use std:: sync:: Arc ;
2328use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
@@ -204,6 +209,77 @@ fn flight_stream_from_record_batch_stream(
204209 } ) ) )
205210}
206211
212+ // Collects metrics from the provided stage and encodes it into a stream of flight data using
213+ // the schema of the stage.
214+ pub fn collect_and_create_metrics_flight_data (
215+ stage_key : StageKey ,
216+ stage : Arc < StageExec > ,
217+ ) -> Result < impl Stream < Item = Result < FlightData , FlightError > > + Send + ' static , FlightError > {
218+ // Get the metrics for the task executed on this worker. Separately, collect metrics for child tasks.
219+ let mut result = TaskMetricsCollector :: new ( )
220+ . collect ( stage. plan . clone ( ) )
221+ . map_err ( |err| FlightError :: ProtocolError ( err. to_string ( ) ) ) ?;
222+
223+ // Add the metrics for this task into the collection of task metrics.
224+ // Skip any metrics that can't be converted to proto (unsupported types)
225+ let proto_task_metrics = result
226+ . task_metrics
227+ . iter ( )
228+ . map ( |metrics| {
229+ df_metrics_set_to_proto ( metrics)
230+ . map_err ( |err| FlightError :: ProtocolError ( err. to_string ( ) ) )
231+ } )
232+ . collect :: < Result < Vec < _ > , _ > > ( ) ?;
233+ result
234+ . child_task_metrics
235+ . insert ( stage_key, proto_task_metrics) ;
236+
237+ // Serialize the metrics for all tasks.
238+ let mut task_metrics_set = vec ! [ ] ;
239+ for ( stage_key, metrics) in result. child_task_metrics . into_iter ( ) {
240+ task_metrics_set. push ( TaskMetrics {
241+ stage_key : Some ( stage_key) ,
242+ metrics,
243+ } ) ;
244+ }
245+
246+ let flight_app_metadata = FlightAppMetadata {
247+ content : Some ( AppMetadata :: MetricsCollection ( MetricsCollection {
248+ tasks : task_metrics_set,
249+ } ) ) ,
250+ } ;
251+
252+ let metrics_flight_data =
253+ empty_flight_data_with_app_metadata ( flight_app_metadata, stage. plan . schema ( ) ) ?;
254+ Ok ( Box :: pin ( stream:: once (
255+ async move { Ok ( metrics_flight_data) } ,
256+ ) ) )
257+ }
258+
259+ /// Creates a FlightData with the given app_metadata and empty RecordBatch using the provided schema.
260+ /// We don't use [arrow_flight::encode::FlightDataEncoder] (and by extension, the [arrow_flight::encode::FlightDataEncoderBuilder])
261+ /// since they skip messages with empty RecordBatch data.
262+ pub fn empty_flight_data_with_app_metadata (
263+ metadata : FlightAppMetadata ,
264+ schema : SchemaRef ,
265+ ) -> Result < FlightData , FlightError > {
266+ let mut buf = vec ! [ ] ;
267+ metadata
268+ . encode ( & mut buf)
269+ . map_err ( |err| FlightError :: ProtocolError ( err. to_string ( ) ) ) ?;
270+
271+ let empty_batch = RecordBatch :: new_empty ( schema) ;
272+ let options = IpcWriteOptions :: default ( ) ;
273+ let data_gen = IpcDataGenerator :: default ( ) ;
274+ let mut dictionary_tracker = DictionaryTracker :: new ( true ) ;
275+ let ( _, encoded_data) = data_gen
276+ . encoded_batch ( & empty_batch, & mut dictionary_tracker, & options)
277+ . map_err ( |e| {
278+ FlightError :: ProtocolError ( format ! ( "Failed to create empty batch FlightData: {e}" ) )
279+ } ) ?;
280+ Ok ( FlightData :: from ( encoded_data) . with_app_metadata ( buf) )
281+ }
282+
207283#[ cfg( test) ]
208284mod tests {
209285 use super :: * ;
0 commit comments