diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index 4423f249..2c1f3f5e 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -24,7 +24,7 @@ use cb_common::{ use futures::future::join_all; use parking_lot::RwLock; use reqwest::{header::USER_AGENT, StatusCode}; -use tokio::time::sleep; +use tokio::time::{sleep, timeout}; use tracing::{debug, error, warn, Instrument}; use tree_hash::TreeHash; use url::Url; @@ -192,21 +192,20 @@ async fn send_timed_get_header( send_freq_ms, timeout_left_ms, "TG: sending multiple header requests" ); + let mut request_id = 0; + loop { - handles.push(tokio::spawn( + handles.push(tokio::spawn(timeout( + Duration::from_millis(timeout_left_ms), send_one_get_header( params, relay.clone(), chain, - RequestContext { - timeout_ms: timeout_left_ms, - url: url.clone(), - headers: headers.clone(), - }, + RequestContext { url: url.clone(), request_id, headers: headers.clone() }, validation.clone(), ) .in_current_span(), - )); + ))); if timeout_left_ms > send_freq_ms { // enough time for one more @@ -215,38 +214,45 @@ async fn send_timed_get_header( } else { break; } + + request_id += 1; } + debug!(relay_id = relay.id.as_ref(), "TG: joining header requests"); + let results = join_all(handles).await; + + let filtered = results.into_iter().filter_map(|res| match res { + // ignore join error and timeouts + Ok(Ok(r)) => Some(r), + _ => None, + }); + + let mut maybe_header = None; + let mut start_time = 0; + let mut n_responses = 0; let mut n_headers = 0; - if let Some((_, maybe_header)) = results - .into_iter() - .filter_map(|res| { - // ignore join error and timeouts, log other errors - res.ok().and_then(|inner_res| match inner_res { - Ok(maybe_header) => { - if maybe_header.1.is_some() { - n_headers += 1; - Some(maybe_header) - } else { - // filter out 204 responses that are returned if the request - // is after the relay cutoff - None + for res in filtered { + n_responses += 1; + match res { + Ok((req_start_time, req_header)) => { + if req_header.is_some() { + n_headers += 1; + + if req_start_time > start_time { + start_time = req_start_time; + maybe_header = req_header; } } - Err(err) if err.is_timeout() => None, - Err(err) => { - error!(relay_id = relay.id.as_ref(),%err, "TG: error sending header request"); - None - } - }) - }) - .max_by_key(|(start_time, _)| *start_time) - { - debug!(relay_id = relay.id.as_ref(), n_headers, "TG: received headers from relay"); - return Ok(maybe_header); - } else { + } + Err(err) => { + error!(relay_id = relay.id.as_ref(),%err, "TG: error sending header request"); + } + } + } + + if n_responses == 0 { // all requests failed warn!(relay_id = relay.id.as_ref(), "TG: no headers received"); @@ -255,6 +261,13 @@ async fn send_timed_get_header( code: TIMEOUT_ERROR_CODE, }); } + + debug!( + relay_id = relay.id.as_ref(), + n_responses, n_headers, "TG: received responses from relay" + ); + + return Ok(maybe_header); } } @@ -263,7 +276,7 @@ async fn send_timed_get_header( params, relay, chain, - RequestContext { timeout_ms: timeout_left_ms, url, headers }, + RequestContext { url, request_id: 0, headers }, validation, ) .await @@ -272,7 +285,7 @@ async fn send_timed_get_header( struct RequestContext { url: Url, - timeout_ms: u64, + request_id: u64, headers: HeaderMap, } @@ -291,6 +304,12 @@ async fn send_one_get_header( mut req_config: RequestContext, validation: ValidationContext, ) -> Result<(u64, Option), PbsError> { + debug!( + relay_id = relay.id.as_ref(), + req_id = req_config.request_id, + "sending get_header request" + ); + // the timestamp in the header is the consensus block time which is fixed, // use the beginning of the request as proxy to make sure we use only the // last one received @@ -298,14 +317,7 @@ async fn send_one_get_header( req_config.headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(start_request_time)); let start_request = Instant::now(); - let res = match relay - .client - .get(req_config.url) - .timeout(Duration::from_millis(req_config.timeout_ms)) - .headers(req_config.headers) - .send() - .await - { + let res = match relay.client.get(req_config.url).headers(req_config.headers).send().await { Ok(res) => res, Err(err) => { RELAY_STATUS_CODE @@ -315,6 +327,8 @@ async fn send_one_get_header( } }; + debug!(relay_id = relay.id.as_ref(), req_id = req_config.request_id, "received relay response"); + let request_latency = start_request.elapsed(); RELAY_LATENCY .with_label_values(&[GET_HEADER_ENDPOINT_TAG, &relay.id]) @@ -324,6 +338,9 @@ async fn send_one_get_header( RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc(); let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_GET_HEADER).await?; + + debug!(relay_id = relay.id.as_ref(), req_id = req_config.request_id, "read relay response"); + if !code.is_success() { return Err(PbsError::RelayResponse { error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), @@ -333,6 +350,7 @@ async fn send_one_get_header( if code == StatusCode::NO_CONTENT { debug!( relay_id = relay.id.as_ref(), + req_id = req_config.request_id, ?code, latency = ?request_latency, response = ?response_bytes, @@ -353,6 +371,7 @@ async fn send_one_get_header( debug!( relay_id = relay.id.as_ref(), + req_id = req_config.request_id, latency = ?request_latency, version = get_header_response.version(), value_eth = format_ether(get_header_response.value()),