@@ -19,56 +19,78 @@ use std::task::{Context, Poll, ready};
1919use std:: time:: Instant ;
2020
2121use pin_project:: { pin_project, pinned_drop} ;
22- use quickwit_proto:: search:: LeafSearchResponse ;
22+ use quickwit_proto:: search:: { LeafSearchResponse , SearchResponse } ;
2323
2424use crate :: SearchError ;
2525use crate :: metrics:: SEARCH_METRICS ;
2626
27- // root
27+ // planning
2828
29- pub enum RootSearchMetricsStep {
30- Plan ,
31- Exec { num_targeted_splits : usize } ,
29+ /// Wrapper around the plan future to tracks error/cancellation metrics.
30+ /// Planning phase success isn't explicitely recorded as it can be deduced from
31+ /// the search phase metrics.
32+ #[ pin_project( PinnedDrop ) ]
33+ pub struct SearchPlanMetricsFuture < F > {
34+ #[ pin]
35+ pub tracked : F ,
36+ pub start : Instant ,
37+ pub is_success : Option < bool > ,
38+ }
39+
40+ #[ pinned_drop]
41+ impl < F > PinnedDrop for SearchPlanMetricsFuture < F > {
42+ fn drop ( self : Pin < & mut Self > ) {
43+ let status = match self . is_success {
44+ // this is a partial success, actual status will be recorded during the search step
45+ Some ( true ) => return ,
46+ Some ( false ) => "plan-error" ,
47+ None => "plan-cancelled" ,
48+ } ;
49+
50+ let label_values = [ status] ;
51+ SEARCH_METRICS
52+ . root_search_requests_total
53+ . with_label_values ( label_values)
54+ . inc ( ) ;
55+ SEARCH_METRICS
56+ . root_search_request_duration_seconds
57+ . with_label_values ( label_values)
58+ . observe ( self . start . elapsed ( ) . as_secs_f64 ( ) ) ;
59+ }
3260}
3361
34- /// Wrapper around the plan and search futures to track metrics.
62+ impl < F , R > Future for SearchPlanMetricsFuture < F >
63+ where F : Future < Output = crate :: Result < R > >
64+ {
65+ type Output = crate :: Result < R > ;
66+
67+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
68+ let this = self . project ( ) ;
69+ let response = ready ! ( this. tracked. poll( cx) ) ;
70+ if let Err ( err) = & response {
71+ tracing:: error!( ?err, "root search planning failed" ) ;
72+ }
73+ * this. is_success = Some ( response. is_ok ( ) ) ;
74+ Poll :: Ready ( Ok ( response?) )
75+ }
76+ }
77+
78+ // root search
79+
80+ /// Wrapper around the root search futures to track metrics.
3581#[ pin_project( PinnedDrop ) ]
3682pub struct RootSearchMetricsFuture < F > {
3783 #[ pin]
3884 pub tracked : F ,
3985 pub start : Instant ,
40- pub step : RootSearchMetricsStep ,
41- pub is_success : Option < bool > ,
86+ pub num_targeted_splits : usize ,
87+ pub status : Option < & ' static str > ,
4288}
4389
4490#[ pinned_drop]
4591impl < F > PinnedDrop for RootSearchMetricsFuture < F > {
4692 fn drop ( self : Pin < & mut Self > ) {
47- let ( num_targeted_splits, status) = match ( & self . step , self . is_success ) {
48- // is is a partial success, actual success is recorded during the search step
49- ( RootSearchMetricsStep :: Plan , Some ( true ) ) => return ,
50- ( RootSearchMetricsStep :: Plan , Some ( false ) ) => ( 0 , "plan-error" ) ,
51- ( RootSearchMetricsStep :: Plan , None ) => ( 0 , "plan-cancelled" ) ,
52- (
53- RootSearchMetricsStep :: Exec {
54- num_targeted_splits,
55- } ,
56- Some ( true ) ,
57- ) => ( * num_targeted_splits, "success" ) ,
58- (
59- RootSearchMetricsStep :: Exec {
60- num_targeted_splits,
61- } ,
62- Some ( false ) ,
63- ) => ( * num_targeted_splits, "error" ) ,
64- (
65- RootSearchMetricsStep :: Exec {
66- num_targeted_splits,
67- } ,
68- None ,
69- ) => ( * num_targeted_splits, "cancelled" ) ,
70- } ;
71-
93+ let status = self . status . unwrap_or ( "cancelled" ) ;
7294 let label_values = [ status] ;
7395 SEARCH_METRICS
7496 . root_search_requests_total
@@ -81,30 +103,39 @@ impl<F> PinnedDrop for RootSearchMetricsFuture<F> {
81103 SEARCH_METRICS
82104 . root_search_targeted_splits
83105 . with_label_values ( label_values)
84- . observe ( num_targeted_splits as f64 ) ;
106+ . observe ( self . num_targeted_splits as f64 ) ;
85107 }
86108}
87109
88- impl < F , R , E > Future for RootSearchMetricsFuture < F >
89- where F : Future < Output = Result < R , E > >
110+ impl < F > Future for RootSearchMetricsFuture < F >
111+ where F : Future < Output = crate :: Result < SearchResponse > >
90112{
91- type Output = Result < R , E > ;
113+ type Output = crate :: Result < SearchResponse > ;
92114
93115 fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
94116 let this = self . project ( ) ;
95117 let response = ready ! ( this. tracked. poll( cx) ) ;
96- * this. is_success = Some ( response. is_ok ( ) ) ;
118+ if let Err ( err) = & response {
119+ tracing:: error!( ?err, "root search failed" ) ;
120+ }
121+ if let Ok ( resp) = & response {
122+ if resp. failed_splits . is_empty ( ) {
123+ * this. status = Some ( "success" ) ;
124+ } else {
125+ * this. status = Some ( "partial-success" ) ;
126+ }
127+ } else {
128+ * this. status = Some ( "error" ) ;
129+ }
97130 Poll :: Ready ( Ok ( response?) )
98131 }
99132}
100133
101- // leaf
134+ // leaf search
102135
103136/// Wrapper around the search future to track metrics.
104137#[ pin_project( PinnedDrop ) ]
105- pub struct LeafSearchMetricsFuture < F >
106- where F : Future < Output = Result < LeafSearchResponse , SearchError > >
107- {
138+ pub struct LeafSearchMetricsFuture < F > {
108139 #[ pin]
109140 pub tracked : F ,
110141 pub start : Instant ,
@@ -113,9 +144,7 @@ where F: Future<Output = Result<LeafSearchResponse, SearchError>>
113144}
114145
115146#[ pinned_drop]
116- impl < F > PinnedDrop for LeafSearchMetricsFuture < F >
117- where F : Future < Output = Result < LeafSearchResponse , SearchError > >
118- {
147+ impl < F > PinnedDrop for LeafSearchMetricsFuture < F > {
119148 fn drop ( self : Pin < & mut Self > ) {
120149 let label_values = [ self . status . unwrap_or ( "cancelled" ) ] ;
121150 SEARCH_METRICS
@@ -141,10 +170,10 @@ where F: Future<Output = Result<LeafSearchResponse, SearchError>>
141170 fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
142171 let this = self . project ( ) ;
143172 let response = ready ! ( this. tracked. poll( cx) ) ;
144- * this. status = if response. is_ok ( ) {
145- Some ( "success" )
146- } else {
147- Some ( "error" )
173+ * this. status = match & response {
174+ Ok ( resp ) if !resp . failed_splits . is_empty ( ) => Some ( "partial- success" ) ,
175+ Ok ( _ ) => Some ( "success" ) ,
176+ Err ( _ ) => Some ( "error" ) ,
148177 } ;
149178 Poll :: Ready ( Ok ( response?) )
150179 }
0 commit comments