Skip to content

Commit 238a569

Browse files
committed
chore(bolt-boost): track pbs state
1 parent 48d940d commit 238a569

File tree

4 files changed

+57
-13
lines changed

4 files changed

+57
-13
lines changed

bolt-boost/Cargo.lock

Lines changed: 5 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bolt-boost/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ cb-pbs = { git = "https://github.com/commit-boost/commit-boost-client", rev = "v
5050
rand = "0.8.5"
5151
parking_lot = "0.12.3"
5252
lazy_static = "1.5.0"
53+
dashmap = "6.1.0"
54+
uuid = "1.12.0"
5355

5456
[dev-dependencies]
5557
# NOTE: we need this in order to play nice with Lighthouse types at version 6.0.1

bolt-boost/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::{
2121

2222
#[tokio::main]
2323
async fn main() -> Result<()> {
24-
let (pbs_config, extra) = load_pbs_custom_config::<Config>()?;
24+
let (pbs_config, extra) = load_pbs_custom_config::<Config>().await?;
2525
tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).init();
2626

2727
let chain = pbs_config.chain;

bolt-boost/src/server.rs

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,26 @@ use axum::{
1111
routing::{get, post},
1212
Json, Router,
1313
};
14+
use dashmap::DashMap;
1415
use eyre::Result;
1516
use futures::{future::join_all, stream::FuturesUnordered, StreamExt};
1617
use serde::Serialize;
1718
use std::{
1819
collections::HashMap,
20+
sync::{Arc, Mutex},
1921
time::{Duration, Instant},
2022
};
2123
use tokio::time::sleep;
2224
use tracing::{debug, error, info, warn, Instrument};
25+
use uuid::Uuid;
2326

2427
use cb_common::{
2528
config::PbsConfig,
2629
constants::APPLICATION_BUILDER_DOMAIN,
2730
pbs::{
2831
error::{PbsError, ValidationError},
2932
GetHeaderResponse, RelayClient, SignedExecutionPayloadHeader, EMPTY_TX_ROOT_HASH,
30-
HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS,
33+
HEADER_START_TIME_UNIX_MS,
3134
},
3235
signature::verify_signed_message,
3336
types::Chain,
@@ -54,6 +57,7 @@ const DELEGATE_PATH: &str = "/constraints/v1/builder/delegate";
5457
const REVOKE_PATH: &str = "/constraints/v1/builder/revoke";
5558
const GET_HEADER_WITH_PROOFS_PATH: &str =
5659
"/eth/v1/builder/header_with_proofs/:slot/:parent_hash/:pubkey";
60+
const HEADER_SLOT_UUID_KEY: &str = "X-MEVBoost-SlotID";
5761

5862
const TIMEOUT_ERROR_CODE: u16 = 555;
5963

@@ -63,13 +67,48 @@ pub struct BuilderState {
6367
#[allow(unused)]
6468
config: Config,
6569
constraints: ConstraintsCache,
70+
current_slot_info: Arc<Mutex<(u64, Uuid)>>,
71+
bid_cache: Arc<DashMap<u64, Vec<GetHeaderResponse>>>,
6672
}
6773

6874
impl BuilderApiState for BuilderState {}
6975

7076
impl BuilderState {
7177
pub fn from_config(config: Config) -> Self {
72-
Self { config, constraints: ConstraintsCache::new() }
78+
Self {
79+
config,
80+
constraints: ConstraintsCache::new(),
81+
current_slot_info: Arc::new(Mutex::new((0, Uuid::new_v4()))),
82+
bid_cache: Arc::new(DashMap::new()),
83+
}
84+
}
85+
86+
pub fn get_or_update_slot_uuid(&self, last_slot: u64) -> Uuid {
87+
let mut guard = self.current_slot_info.lock().expect("poisoned");
88+
if guard.0 < last_slot {
89+
// new slot
90+
guard.0 = last_slot;
91+
guard.1 = Uuid::new_v4();
92+
self.clear_old_bids(last_slot);
93+
}
94+
guard.1
95+
}
96+
97+
pub fn get_slot_and_uuid(&self) -> (u64, Uuid) {
98+
let guard = self.current_slot_info.lock().expect("poisoned");
99+
*guard
100+
}
101+
102+
/// Add bids to the cache for a given slot, returns the highest value bid
103+
pub fn add_bids(&self, slot: u64, bids: Vec<GetHeaderResponse>) -> Option<GetHeaderResponse> {
104+
let mut slot_entry = self.bid_cache.entry(slot).or_default();
105+
slot_entry.extend(bids);
106+
slot_entry.iter().max_by_key(|bid| bid.data.message.value).cloned()
107+
}
108+
109+
/// Clear bids which are more than ~3 minutes old (15 slots)
110+
fn clear_old_bids(&self, last_slot: u64) {
111+
self.bid_cache.retain(|slot, _| last_slot.saturating_sub(*slot) < 15)
73112
}
74113
}
75114

@@ -90,7 +129,7 @@ impl BuilderApi<BuilderState> for ConstraintsApi {
90129
req_headers: HeaderMap,
91130
state: PbsState<BuilderState>,
92131
) -> eyre::Result<()> {
93-
let (slot, _) = state.get_slot_and_uuid();
132+
let (slot, _) = state.data.get_slot_and_uuid();
94133

95134
info!("Cleaning up constraints before slot {slot}");
96135
state.data.constraints.remove_before(slot);
@@ -118,7 +157,7 @@ async fn submit_constraints(
118157
Json(constraints): Json<Vec<SignedConstraints>>,
119158
) -> Result<impl IntoResponse, PbsClientError> {
120159
info!("Submitting {} constraints to relays", constraints.len());
121-
let (current_slot, _) = state.get_slot_and_uuid();
160+
let (current_slot, _) = state.data.get_slot_and_uuid();
122161

123162
// Save constraints for the slot to verify proofs against later.
124163
for signed_constraints in &constraints {
@@ -172,7 +211,7 @@ async fn get_header_with_proofs(
172211
Path(params): Path<GetHeaderParams>,
173212
req_headers: HeaderMap,
174213
) -> Result<impl IntoResponse, PbsClientError> {
175-
let slot_uuid = state.get_or_update_slot_uuid(params.slot);
214+
let slot_uuid = state.data.get_or_update_slot_uuid(params.slot);
176215

177216
let ua = get_user_agent(&req_headers);
178217
let ms_into_slot = ms_into_slot(params.slot, state.config.chain);
@@ -201,7 +240,7 @@ async fn get_header_with_proofs(
201240
.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string()).unwrap());
202241
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers).unwrap());
203242

204-
let relays = state.relays();
243+
let (_, relays, _) = state.mux_config_and_relays(&params.pubkey);
205244
let mut handles = Vec::with_capacity(relays.len());
206245
for relay in relays {
207246
handles.push(send_timed_get_header(
@@ -258,7 +297,7 @@ async fn get_header_with_proofs(
258297
}
259298
}
260299

261-
if let Some(winning_bid) = state.add_bids(params.slot, relay_bids) {
300+
if let Some(winning_bid) = state.data.add_bids(params.slot, relay_bids) {
262301
let header_with_proofs = GetHeaderWithProofsResponse {
263302
data: SignedExecutionPayloadHeaderWithProofs {
264303
// If there are no proofs, default to empty. This should never happen unless there
@@ -531,11 +570,12 @@ async fn post_request<T>(
531570
where
532571
T: Serialize,
533572
{
534-
debug!("Sending POST request to {} relays", state.relays().len());
573+
let relays = state.config.relays;
574+
debug!("Sending POST request to {} relays", relays.len());
535575
// Forward constraints to all relays.
536576
let mut responses = FuturesUnordered::new();
537577

538-
for relay in state.relays() {
578+
for relay in relays {
539579
let url = relay.get_url(path).map_err(|_| PbsClientError::BadRequest)?;
540580
responses.push(relay.client.post(url).json(&body).send());
541581
}

0 commit comments

Comments
 (0)