88// the Business Source License, use of this software will be governed
99// by the Apache License, Version 2.0.
1010
11+ use std:: pin:: Pin ;
12+ use std:: task:: { Context , Poll } ;
13+
1114use bytes:: { Bytes , BytesMut } ;
1215use bytestring:: ByteString ;
1316use datafusion:: arrow:: ipc:: writer:: StreamWriter ;
1417use datafusion:: error:: DataFusionError ;
15- use futures:: StreamExt ;
1618use futures:: stream:: BoxStream ;
19+ use futures:: { Stream , StreamExt } ;
1720use tonic:: codec:: CompressionEncoding ;
1821use tonic:: { Request , Response , Status , async_trait} ;
1922use tracing:: info;
@@ -25,14 +28,15 @@ use restate_core::protobuf::cluster_ctrl_svc::{
2528 CreatePartitionSnapshotResponse , DescribeLogRequest , DescribeLogResponse , FindTailRequest ,
2629 FindTailResponse , GetClusterConfigurationRequest , GetClusterConfigurationResponse ,
2730 ListLogsRequest , ListLogsResponse , MigrateMetadataRequest , MigrateMetadataResponse ,
28- QueryRequest , QueryResponse , SealAndExtendChainRequest , SealAndExtendChainResponse ,
29- SealChainRequest , SealChainResponse , SealedSegment , SetClusterConfigurationRequest ,
30- SetClusterConfigurationResponse , TailState , TrimLogRequest ,
31+ QueryRequest , QueryResponse , QueryWarning , SealAndExtendChainRequest ,
32+ SealAndExtendChainResponse , SealChainRequest , SealChainResponse , SealedSegment ,
33+ SetClusterConfigurationRequest , SetClusterConfigurationResponse , TailState , TrimLogRequest ,
3134 cluster_ctrl_svc_server:: { ClusterCtrlSvc , ClusterCtrlSvcServer } ,
3235} ;
3336use restate_core:: { Metadata , MetadataWriter } ;
3437use restate_metadata_store:: WriteError ;
3538use restate_storage_query_datafusion:: context:: QueryContext ;
39+ use restate_storage_query_datafusion:: node_fan_out:: NodeWarnings ;
3640use restate_types:: config:: { MetadataClientKind , MetadataClientOptions , NetworkingOptions } ;
3741use restate_types:: identifiers:: PartitionId ;
3842use restate_types:: logs:: metadata:: { Logs , SegmentIndex } ;
@@ -419,21 +423,37 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
419423 request : Request < QueryRequest > ,
420424 ) -> std:: result:: Result < Response < Self :: QueryStream > , tonic:: Status > {
421425 let request = request. into_inner ( ) ;
422- let stream = self
426+ let query_result = self
423427 . query_context
424428 . execute ( & request. query )
425429 . await
426430 . map_err ( datafusion_error_to_status) ?;
427431
428- Ok ( Response :: new (
429- WriteRecordBatchStream :: < StreamWriter < Vec < u8 > > > :: new ( stream, request. query )
430- . map_err ( datafusion_error_to_status) ?
431- . map ( |item| {
432- item. map ( |encoded| QueryResponse { encoded } )
433- . map_err ( datafusion_error_to_status)
434- } )
435- . boxed ( ) ,
436- ) )
432+ let node_warnings = query_result. node_warnings ;
433+
434+ let data_stream = WriteRecordBatchStream :: < StreamWriter < Vec < u8 > > > :: new (
435+ query_result. stream ,
436+ request. query ,
437+ )
438+ . map_err ( datafusion_error_to_status) ?
439+ . map ( |item| {
440+ item. map ( |encoded| QueryResponse {
441+ encoded,
442+ ..Default :: default ( )
443+ } )
444+ . map_err ( datafusion_error_to_status)
445+ } ) ;
446+
447+ // Wrap the data stream to attach per-node warnings to the final
448+ // response message, avoiding an extra trailing empty-data message.
449+ let stream = QueryWarningStream {
450+ inner : data_stream. boxed ( ) ,
451+ node_warnings,
452+ last_response : None ,
453+ done : false ,
454+ } ;
455+
456+ Ok ( Response :: new ( stream. boxed ( ) ) )
437457 }
438458
439459 /// Migrate metadata from the current metadata store to a target store
@@ -539,6 +559,72 @@ fn serialize_value<T: StorageEncode>(value: &T) -> Bytes {
539559 buf. freeze ( )
540560}
541561
562+ /// Stream wrapper that buffers the last response from the inner stream, and
563+ /// when the inner stream ends, attaches any accumulated per-node warnings
564+ /// to that final response before yielding it.
565+ struct QueryWarningStream {
566+ inner : BoxStream < ' static , Result < QueryResponse , Status > > ,
567+ node_warnings : Vec < NodeWarnings > ,
568+ last_response : Option < Result < QueryResponse , Status > > ,
569+ done : bool ,
570+ }
571+
572+ impl Stream for QueryWarningStream {
573+ type Item = Result < QueryResponse , Status > ;
574+
575+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
576+ if self . done {
577+ return Poll :: Ready ( None ) ;
578+ }
579+
580+ loop {
581+ match self . inner . poll_next_unpin ( cx) {
582+ Poll :: Pending => return Poll :: Pending ,
583+ Poll :: Ready ( None ) => {
584+ self . done = true ;
585+ // Inner stream ended. Yield the buffered last response
586+ // with warnings attached, or a warnings-only response.
587+ let warnings = drain_node_warnings ( & self . node_warnings ) ;
588+ return match self . last_response . take ( ) {
589+ Some ( Ok ( mut resp) ) => {
590+ resp. warnings = warnings;
591+ Poll :: Ready ( Some ( Ok ( resp) ) )
592+ }
593+ Some ( Err ( err) ) => Poll :: Ready ( Some ( Err ( err) ) ) ,
594+ None if !warnings. is_empty ( ) => {
595+ // No data at all, but we have warnings
596+ Poll :: Ready ( Some ( Ok ( QueryResponse {
597+ encoded : Default :: default ( ) ,
598+ warnings,
599+ } ) ) )
600+ }
601+ None => Poll :: Ready ( None ) ,
602+ } ;
603+ }
604+ Poll :: Ready ( Some ( item) ) => {
605+ // Yield the previously buffered response, buffer this one
606+ if let Some ( prev) = self . last_response . replace ( item) {
607+ return Poll :: Ready ( Some ( prev) ) ;
608+ }
609+ // First item — buffer it and poll for the next
610+ continue ;
611+ }
612+ }
613+ }
614+ }
615+ }
616+
617+ fn drain_node_warnings ( node_warnings : & [ NodeWarnings ] ) -> Vec < QueryWarning > {
618+ let mut out = Vec :: new ( ) ;
619+ for nw in node_warnings {
620+ out. extend ( nw. lock ( ) . drain ( ..) . map ( |w| QueryWarning {
621+ node_id : w. node_id ,
622+ message : w. message ,
623+ } ) ) ;
624+ }
625+ out
626+ }
627+
542628fn datafusion_error_to_status ( err : DataFusionError ) -> Status {
543629 match err {
544630 DataFusionError :: SQL ( ..)
0 commit comments