33// SPDX-License-Identifier: AGPL-3.0-only
44// Please see LICENSE in the repository root for full details.
55
6- use std:: { future:: Future , str:: FromStr , sync:: Arc , time:: Duration } ;
6+ use std:: {
7+ future:: Future ,
8+ str:: FromStr ,
9+ sync:: { Arc , LazyLock } ,
10+ time:: Duration ,
11+ } ;
712
813use futures_util:: FutureExt as _;
9- use headers:: { ContentLength , HeaderMapExt as _, Host , UserAgent } ;
14+ use headers:: { ContentLength , HeaderMapExt as _, UserAgent } ;
1015use hyper_util:: client:: legacy:: connect:: {
1116 dns:: { GaiResolver , Name } ,
1217 HttpInfo ,
1318} ;
19+ use opentelemetry:: {
20+ metrics:: { Histogram , UpDownCounter } ,
21+ KeyValue ,
22+ } ;
1423use opentelemetry_http:: HeaderInjector ;
1524use opentelemetry_semantic_conventions:: {
1625 attribute:: { HTTP_REQUEST_BODY_SIZE , HTTP_RESPONSE_BODY_SIZE } ,
26+ metric:: { HTTP_CLIENT_ACTIVE_REQUESTS , HTTP_CLIENT_REQUEST_DURATION } ,
1727 trace:: {
18- CLIENT_ADDRESS , CLIENT_PORT , HTTP_REQUEST_METHOD , HTTP_RESPONSE_STATUS_CODE ,
19- NETWORK_TRANSPORT , NETWORK_TYPE , SERVER_ADDRESS , SERVER_PORT , URL_FULL ,
20- USER_AGENT_ORIGINAL ,
28+ ERROR_TYPE , HTTP_REQUEST_METHOD , HTTP_RESPONSE_STATUS_CODE , NETWORK_LOCAL_ADDRESS ,
29+ NETWORK_LOCAL_PORT , NETWORK_PEER_ADDRESS , NETWORK_PEER_PORT , NETWORK_TRANSPORT ,
30+ NETWORK_TYPE , SERVER_ADDRESS , SERVER_PORT , URL_FULL , URL_SCHEME , USER_AGENT_ORIGINAL ,
2131 } ,
2232} ;
33+ use tokio:: time:: Instant ;
2334use tower:: { BoxError , Service as _} ;
2435use tracing:: Instrument ;
2536use tracing_opentelemetry:: OpenTelemetrySpanExt ;
2637
38+ use crate :: METER ;
39+
2740static USER_AGENT : & str = concat ! ( "matrix-authentication-service/" , env!( "CARGO_PKG_VERSION" ) ) ;
2841
42+ static HTTP_REQUESTS_DURATION_HISTOGRAM : LazyLock < Histogram < u64 > > = LazyLock :: new ( || {
43+ METER
44+ . u64_histogram ( HTTP_CLIENT_REQUEST_DURATION )
45+ . with_unit ( "ms" )
46+ . with_description ( "Duration of HTTP client requests" )
47+ . init ( )
48+ } ) ;
49+
50+ static HTTP_REQUESTS_IN_FLIGHT : LazyLock < UpDownCounter < i64 > > = LazyLock :: new ( || {
51+ METER
52+ . i64_up_down_counter ( HTTP_CLIENT_ACTIVE_REQUESTS )
53+ . with_unit ( "{requests}" )
54+ . with_description ( "Number of HTTP client requests in flight" )
55+ . init ( )
56+ } ) ;
57+
2958struct TracingResolver {
3059 inner : GaiResolver ,
3160}
@@ -76,34 +105,38 @@ pub fn client() -> reqwest::Client {
76105async fn send_traced (
77106 request : reqwest:: RequestBuilder ,
78107) -> Result < reqwest:: Response , reqwest:: Error > {
79- // TODO: have in-flight and request metrics
108+ let start = Instant :: now ( ) ;
80109 let ( client, request) = request. build_split ( ) ;
81110 let mut request = request?;
82111
83112 let headers = request. headers ( ) ;
84- let host = headers. typed_get :: < Host > ( ) . map ( tracing:: field:: display) ;
113+ let server_address = request. url ( ) . host_str ( ) . map ( ToOwned :: to_owned) ;
114+ let server_port = request. url ( ) . port_or_known_default ( ) ;
115+ let scheme = request. url ( ) . scheme ( ) . to_owned ( ) ;
85116 let user_agent = headers
86117 . typed_get :: < UserAgent > ( )
87118 . map ( tracing:: field:: display) ;
88119 let content_length = headers. typed_get ( ) . map ( |ContentLength ( len) | len) ;
120+ let method = request. method ( ) . to_string ( ) ;
89121
90122 // Create a new span for the request
91123 let span = tracing:: info_span!(
92124 "http.client.request" ,
93125 "otel.kind" = "client" ,
94126 "otel.status_code" = tracing:: field:: Empty ,
95- { HTTP_REQUEST_METHOD } = %request . method( ) ,
127+ { HTTP_REQUEST_METHOD } = method,
96128 { URL_FULL } = %request. url( ) ,
97129 { HTTP_RESPONSE_STATUS_CODE } = tracing:: field:: Empty ,
98- { SERVER_ADDRESS } = host,
130+ { SERVER_ADDRESS } = server_address,
131+ { SERVER_PORT } = server_port,
99132 { HTTP_REQUEST_BODY_SIZE } = content_length,
100133 { HTTP_RESPONSE_BODY_SIZE } = tracing:: field:: Empty ,
101134 { NETWORK_TRANSPORT } = "tcp" ,
102135 { NETWORK_TYPE } = tracing:: field:: Empty ,
103- { SERVER_ADDRESS } = tracing:: field:: Empty ,
104- { SERVER_PORT } = tracing:: field:: Empty ,
105- { CLIENT_ADDRESS } = tracing:: field:: Empty ,
106- { CLIENT_PORT } = tracing:: field:: Empty ,
136+ { NETWORK_LOCAL_ADDRESS } = tracing:: field:: Empty ,
137+ { NETWORK_LOCAL_PORT } = tracing:: field:: Empty ,
138+ { NETWORK_PEER_ADDRESS } = tracing:: field:: Empty ,
139+ { NETWORK_PEER_PORT } = tracing:: field:: Empty ,
107140 { USER_AGENT_ORIGINAL } = user_agent,
108141 "rust.error" = tracing:: field:: Empty ,
109142 ) ;
@@ -115,9 +148,31 @@ async fn send_traced(
115148 propagator. inject_context ( & context, & mut injector) ;
116149 } ) ;
117150
151+ let mut metrics_labels = vec ! [
152+ KeyValue :: new( HTTP_REQUEST_METHOD , method. clone( ) ) ,
153+ KeyValue :: new( URL_SCHEME , scheme) ,
154+ ] ;
155+
156+ if let Some ( server_address) = server_address {
157+ metrics_labels. push ( KeyValue :: new ( SERVER_ADDRESS , server_address) ) ;
158+ }
159+
160+ if let Some ( server_port) = server_port {
161+ metrics_labels. push ( KeyValue :: new ( SERVER_PORT , i64:: from ( server_port) ) ) ;
162+ }
163+
164+ HTTP_REQUESTS_IN_FLIGHT . add ( 1 , & metrics_labels) ;
118165 async move {
119166 let span = tracing:: Span :: current ( ) ;
120- match client. execute ( request) . await {
167+ let result = client. execute ( request) . await ;
168+
169+ // XXX: We *could* loose this if the future is dropped before this, but let's
170+ // not worry about it for now. Ideally we would use a `Drop` guard to decrement
171+ // the counter
172+ HTTP_REQUESTS_IN_FLIGHT . add ( -1 , & metrics_labels) ;
173+
174+ let duration = start. elapsed ( ) . as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ;
175+ let result = match result {
121176 Ok ( response) => {
122177 span. record ( "otel.status_code" , "OK" ) ;
123178 span. record ( HTTP_RESPONSE_STATUS_CODE , response. status ( ) . as_u16 ( ) ) ;
@@ -128,26 +183,37 @@ async fn send_traced(
128183
129184 if let Some ( http_info) = response. extensions ( ) . get :: < HttpInfo > ( ) {
130185 let local = http_info. local_addr ( ) ;
131- let remote = http_info. remote_addr ( ) ;
132-
186+ let peer = http_info. remote_addr ( ) ;
133187 let family = if local. is_ipv4 ( ) { "ipv4" } else { "ipv6" } ;
134188 span. record ( NETWORK_TYPE , family) ;
135- span. record ( CLIENT_ADDRESS , remote . ip ( ) . to_string ( ) ) ;
136- span. record ( CLIENT_PORT , remote . port ( ) ) ;
137- span. record ( SERVER_ADDRESS , local . ip ( ) . to_string ( ) ) ;
138- span. record ( SERVER_PORT , local . port ( ) ) ;
189+ span. record ( NETWORK_LOCAL_ADDRESS , local . ip ( ) . to_string ( ) ) ;
190+ span. record ( NETWORK_LOCAL_PORT , local . port ( ) ) ;
191+ span. record ( NETWORK_PEER_ADDRESS , peer . ip ( ) . to_string ( ) ) ;
192+ span. record ( NETWORK_PEER_PORT , peer . port ( ) ) ;
139193 } else {
140194 tracing:: warn!( "No HttpInfo injected in response extensions" ) ;
141195 }
142196
197+ metrics_labels. push ( KeyValue :: new (
198+ HTTP_RESPONSE_STATUS_CODE ,
199+ i64:: from ( response. status ( ) . as_u16 ( ) ) ,
200+ ) ) ;
201+
143202 Ok ( response)
144203 }
145204 Err ( err) => {
146205 span. record ( "otel.status_code" , "ERROR" ) ;
147206 span. record ( "rust.error" , & err as & dyn std:: error:: Error ) ;
207+
208+ metrics_labels. push ( KeyValue :: new ( ERROR_TYPE , "NO_RESPONSE" ) ) ;
209+
148210 Err ( err)
149211 }
150- }
212+ } ;
213+
214+ HTTP_REQUESTS_DURATION_HISTOGRAM . record ( duration, & metrics_labels) ;
215+
216+ result
151217 }
152218 . instrument ( span)
153219 . await
0 commit comments