Skip to content
1 change: 1 addition & 0 deletions benches/pbs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ fn get_mock_validator(bench: BenchConfig) -> RelayClient {
enable_timing_games: false,
target_first_request_ms: None,
frequency_get_header_ms: None,
validator_registration_batch_size: None,
};

RelayClient::new(config).unwrap()
Expand Down
3 changes: 3 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ target_first_request_ms = 200
# Frequency in ms to send get_header requests
# OPTIONAL
frequency_get_header_ms = 300
# Maximum number of validators to register in a single request.
# OPTIONAL, DEFAULT: "" (unlimited)
validator_registration_batch_size = ""

# Configuration for the PBS multiplexers, which enable different configs to be used for get header requests, depending on validator pubkey
# Note that:
Expand Down
22 changes: 22 additions & 0 deletions crates/common/src/config/pbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,28 @@ pub struct RelayConfig {
pub target_first_request_ms: Option<u64>,
/// Frequency in ms to send get_header requests
pub frequency_get_header_ms: Option<u64>,
/// Maximum number of validators to send to relays in one registration
/// request
#[serde(deserialize_with = "empty_string_as_none", default)]
pub validator_registration_batch_size: Option<usize>,
}

fn empty_string_as_none<'de, D>(deserializer: D) -> Result<Option<usize>, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum Helper {
Str(String),
Number(usize),
}

match Helper::deserialize(deserializer)? {
Helper::Str(str) if str.is_empty() => Ok(None),
Helper::Number(number) => Ok(Some(number)),
_ => Err(serde::de::Error::custom("Expected empty string or number".to_string())),
}
}

impl RelayConfig {
Expand Down
34 changes: 24 additions & 10 deletions crates/pbs/src/mev_boost/register_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,30 @@ pub async fn register_validator<S: BuilderApiState>(

let relays = state.all_relays().to_vec();
let mut handles = Vec::with_capacity(relays.len());
for relay in relays {
handles.push(tokio::spawn(
send_register_validator_with_timeout(
registrations.clone(),
relay,
send_headers.clone(),
state.pbs_config().timeout_register_validator_ms,
)
.in_current_span(),
));
for relay in relays.clone() {
if let Some(batch_size) = relay.config.validator_registration_batch_size {
for batch in registrations.chunks(batch_size) {
handles.push(tokio::spawn(
send_register_validator_with_timeout(
batch.to_vec(),
relay.clone(),
send_headers.clone(),
state.pbs_config().timeout_register_validator_ms,
)
.in_current_span(),
));
}
} else {
handles.push(tokio::spawn(
send_register_validator_with_timeout(
registrations.clone(),
relay.clone(),
send_headers.clone(),
state.pbs_config().timeout_register_validator_ms,
)
.in_current_span(),
));
}
}

if state.pbs_config().wait_all_registrations {
Expand Down
11 changes: 8 additions & 3 deletions tests/src/mock_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ impl MockValidator {
}

pub async fn do_register_validator(&self) -> Result<(), Error> {
let url = self.comm_boost.register_validator_url().unwrap();
self.do_register_custom_validators(vec![]).await
}

let registration: Vec<ValidatorRegistration> = vec![];
pub async fn do_register_custom_validators(
&self,
registrations: Vec<ValidatorRegistration>,
) -> Result<(), Error> {
let url = self.comm_boost.register_validator_url().unwrap();

self.comm_boost.client.post(url).json(&registration).send().await?.error_for_status()?;
self.comm_boost.client.post(url).json(&registrations).send().await?.error_for_status()?;

Ok(())
}
Expand Down
20 changes: 20 additions & 0 deletions tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ pub fn generate_mock_relay(port: u16, pubkey: BlsPublicKey) -> Result<RelayClien
enable_timing_games: false,
target_first_request_ms: None,
frequency_get_header_ms: None,
validator_registration_batch_size: None,
};
RelayClient::new(config)
}

pub fn generate_mock_relay_with_batch_size(
port: u16,
pubkey: BlsPublicKey,
batch_size: usize,
) -> Result<RelayClient> {
let entry =
RelayEntry { id: format!("mock_{port}"), pubkey, url: get_local_address(port).parse()? };
let config = RelayConfig {
entry,
id: None,
headers: None,
enable_timing_games: false,
target_first_request_ms: None,
frequency_get_header_ms: None,
validator_registration_batch_size: Some(batch_size),
};
RelayClient::new(config)
}
46 changes: 44 additions & 2 deletions tests/tests/pbs_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
u64,
};

use alloy::primitives::U256;
use alloy::{primitives::U256, rpc::types::beacon::relay::ValidatorRegistration};
use cb_common::{
config::{PbsConfig, PbsModuleConfig, RuntimeMuxConfig},
pbs::RelayClient,
Expand All @@ -18,7 +18,7 @@ use cb_pbs::{DefaultBuilderApi, PbsService, PbsState};
use cb_tests::{
mock_relay::{start_mock_relay_service, MockRelayState},
mock_validator::MockValidator,
utils::{generate_mock_relay, setup_test_env},
utils::{generate_mock_relay, generate_mock_relay_with_batch_size, setup_test_env},
};
use eyre::Result;
use tracing::info;
Expand Down Expand Up @@ -143,6 +143,48 @@ async fn test_register_validators() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_batch_register_validators() -> Result<()> {
setup_test_env();
let signer = random_secret();
let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into();

let chain = Chain::Holesky;
let port = 3310;

let relays = vec![generate_mock_relay_with_batch_size(port + 1, *pubkey, 5)?];
let mock_state = Arc::new(MockRelayState::new(chain, signer));
tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1));

let config = to_pbs_config(chain, get_pbs_static_config(port), relays);
let state = PbsState::new(config);
tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state));

// leave some time to start servers
tokio::time::sleep(Duration::from_millis(100)).await;

let data = include_str!("../data/registration_holesky.json");
let registrations: Vec<ValidatorRegistration> = serde_json::from_str(data)?;

let mock_validator = MockValidator::new(port)?;
info!("Sending register validator");
let res = mock_validator.do_register_custom_validators(registrations.clone()).await;

// registrations.len() == 17. 5 per batch, 4 batches
assert!(res.is_ok());
assert_eq!(mock_state.received_register_validator(), 4);

let mock_validator = MockValidator::new(port)?;
info!("Sending register validator");
let res = mock_validator.do_register_custom_validators(registrations[..2].to_vec()).await;

// Expected one more registration request
assert!(res.is_ok());
assert_eq!(mock_state.received_register_validator(), 5);

Ok(())
}

#[tokio::test]
async fn test_submit_block() -> Result<()> {
setup_test_env();
Expand Down
Loading