diff --git a/benches/pbs/src/main.rs b/benches/pbs/src/main.rs index aedde1e3..19a79687 100644 --- a/benches/pbs/src/main.rs +++ b/benches/pbs/src/main.rs @@ -160,6 +160,7 @@ fn get_mock_validator(bench: BenchConfig) -> RelayClient { enable_timing_games: false, target_first_request_ms: None, frequency_get_header_ms: None, + validator_registration_batch_size: None, }; RelayClient::new(config).unwrap() diff --git a/config.example.toml b/config.example.toml index 253d35d1..bb217b35 100644 --- a/config.example.toml +++ b/config.example.toml @@ -96,6 +96,9 @@ target_first_request_ms = 200 # Frequency in ms to send get_header requests # OPTIONAL frequency_get_header_ms = 300 +# Maximum number of validators to register in a single request. +# OPTIONAL, DEFAULT: "" (unlimited) +validator_registration_batch_size = "" # Configuration for the PBS multiplexers, which enable different configs to be used for get header requests, depending on validator pubkey # Note that: diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 5e42b6c6..68688109 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -50,6 +50,30 @@ pub struct RelayConfig { pub target_first_request_ms: Option, /// Frequency in ms to send get_header requests pub frequency_get_header_ms: Option, + /// Maximum number of validators to send to relays in one registration + /// request + #[serde(deserialize_with = "empty_string_as_none", default)] + pub validator_registration_batch_size: Option, +} + +fn empty_string_as_none<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + #[derive(Deserialize)] + #[serde(untagged)] + enum Helper { + Str(String), + Number(usize), + } + + match Helper::deserialize(deserializer)? { + Helper::Str(str) if str.is_empty() => Ok(None), + Helper::Str(str) => Ok(Some(str.parse().map_err(|_| { + serde::de::Error::custom("Expected empty string or number".to_string()) + })?)), + Helper::Number(number) => Ok(Some(number)), + } } impl RelayConfig { diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 6a745319..993e7963 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -34,16 +34,30 @@ pub async fn register_validator( let relays = state.all_relays().to_vec(); let mut handles = Vec::with_capacity(relays.len()); - for relay in relays { - handles.push(tokio::spawn( - send_register_validator_with_timeout( - registrations.clone(), - relay, - send_headers.clone(), - state.pbs_config().timeout_register_validator_ms, - ) - .in_current_span(), - )); + for relay in relays.clone() { + if let Some(batch_size) = relay.config.validator_registration_batch_size { + for batch in registrations.chunks(batch_size) { + handles.push(tokio::spawn( + send_register_validator_with_timeout( + batch.to_vec(), + relay.clone(), + send_headers.clone(), + state.pbs_config().timeout_register_validator_ms, + ) + .in_current_span(), + )); + } + } else { + handles.push(tokio::spawn( + send_register_validator_with_timeout( + registrations.clone(), + relay.clone(), + send_headers.clone(), + state.pbs_config().timeout_register_validator_ms, + ) + .in_current_span(), + )); + } } if state.pbs_config().wait_all_registrations { diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index a8f6a8a3..9b7ff2c6 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -36,11 +36,16 @@ impl MockValidator { } pub async fn do_register_validator(&self) -> Result<(), Error> { - let url = self.comm_boost.register_validator_url().unwrap(); + self.do_register_custom_validators(vec![]).await + } - let registration: Vec = vec![]; + pub async fn do_register_custom_validators( + &self, + registrations: Vec, + ) -> Result<(), Error> { + let url = self.comm_boost.register_validator_url().unwrap(); - self.comm_boost.client.post(url).json(®istration).send().await?.error_for_status()?; + self.comm_boost.client.post(url).json(®istrations).send().await?.error_for_status()?; Ok(()) } diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 5fdb425c..f6716a97 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -28,6 +28,26 @@ pub fn generate_mock_relay(port: u16, pubkey: BlsPublicKey) -> Result Result { + let entry = + RelayEntry { id: format!("mock_{port}"), pubkey, url: get_local_address(port).parse()? }; + let config = RelayConfig { + entry, + id: None, + headers: None, + enable_timing_games: false, + target_first_request_ms: None, + frequency_get_header_ms: None, + validator_registration_batch_size: Some(batch_size), }; RelayClient::new(config) } diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs index cc8f5d27..2e7f95e8 100644 --- a/tests/tests/pbs_integration.rs +++ b/tests/tests/pbs_integration.rs @@ -6,7 +6,7 @@ use std::{ u64, }; -use alloy::primitives::U256; +use alloy::{primitives::U256, rpc::types::beacon::relay::ValidatorRegistration}; use cb_common::{ config::{PbsConfig, PbsModuleConfig, RuntimeMuxConfig}, pbs::RelayClient, @@ -18,7 +18,7 @@ use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ mock_relay::{start_mock_relay_service, MockRelayState}, mock_validator::MockValidator, - utils::{generate_mock_relay, setup_test_env}, + utils::{generate_mock_relay, generate_mock_relay_with_batch_size, setup_test_env}, }; use eyre::Result; use tracing::info; @@ -143,6 +143,48 @@ async fn test_register_validators() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_batch_register_validators() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); + + let chain = Chain::Holesky; + let port = 3310; + + let relays = vec![generate_mock_relay_with_batch_size(port + 1, *pubkey, 5)?]; + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); + + let config = to_pbs_config(chain, get_pbs_static_config(port), relays); + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + // leave some time to start servers + tokio::time::sleep(Duration::from_millis(100)).await; + + let data = include_str!("../data/registration_holesky.json"); + let registrations: Vec = serde_json::from_str(data)?; + + let mock_validator = MockValidator::new(port)?; + info!("Sending register validator"); + let res = mock_validator.do_register_custom_validators(registrations.clone()).await; + + // registrations.len() == 17. 5 per batch, 4 batches + assert!(res.is_ok()); + assert_eq!(mock_state.received_register_validator(), 4); + + let mock_validator = MockValidator::new(port)?; + info!("Sending register validator"); + let res = mock_validator.do_register_custom_validators(registrations[..2].to_vec()).await; + + // Expected one more registration request + assert!(res.is_ok()); + assert_eq!(mock_state.received_register_validator(), 5); + + Ok(()) +} + #[tokio::test] async fn test_submit_block() -> Result<()> { setup_test_env();