Skip to content

Commit 2dc01d1

Browse files
feat(pbs): batch size config for validators registration (#244)
Co-authored-by: ltitanb <[email protected]>
1 parent 248538a commit 2dc01d1

File tree

7 files changed

+124
-15
lines changed

7 files changed

+124
-15
lines changed

benches/pbs/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ fn get_mock_validator(bench: BenchConfig) -> RelayClient {
160160
enable_timing_games: false,
161161
target_first_request_ms: None,
162162
frequency_get_header_ms: None,
163+
validator_registration_batch_size: None,
163164
};
164165

165166
RelayClient::new(config).unwrap()

config.example.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ target_first_request_ms = 200
9696
# Frequency in ms to send get_header requests
9797
# OPTIONAL
9898
frequency_get_header_ms = 300
99+
# Maximum number of validators to register in a single request.
100+
# OPTIONAL, DEFAULT: "" (unlimited)
101+
validator_registration_batch_size = ""
99102

100103
# Configuration for the PBS multiplexers, which enable different configs to be used for get header requests, depending on validator pubkey
101104
# Note that:

crates/common/src/config/pbs.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,30 @@ pub struct RelayConfig {
5050
pub target_first_request_ms: Option<u64>,
5151
/// Frequency in ms to send get_header requests
5252
pub frequency_get_header_ms: Option<u64>,
53+
/// Maximum number of validators to send to relays in one registration
54+
/// request
55+
#[serde(deserialize_with = "empty_string_as_none", default)]
56+
pub validator_registration_batch_size: Option<usize>,
57+
}
58+
59+
fn empty_string_as_none<'de, D>(deserializer: D) -> Result<Option<usize>, D::Error>
60+
where
61+
D: serde::Deserializer<'de>,
62+
{
63+
#[derive(Deserialize)]
64+
#[serde(untagged)]
65+
enum Helper {
66+
Str(String),
67+
Number(usize),
68+
}
69+
70+
match Helper::deserialize(deserializer)? {
71+
Helper::Str(str) if str.is_empty() => Ok(None),
72+
Helper::Str(str) => Ok(Some(str.parse().map_err(|_| {
73+
serde::de::Error::custom("Expected empty string or number".to_string())
74+
})?)),
75+
Helper::Number(number) => Ok(Some(number)),
76+
}
5377
}
5478

5579
impl RelayConfig {

crates/pbs/src/mev_boost/register_validator.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,30 @@ pub async fn register_validator<S: BuilderApiState>(
3434

3535
let relays = state.all_relays().to_vec();
3636
let mut handles = Vec::with_capacity(relays.len());
37-
for relay in relays {
38-
handles.push(tokio::spawn(
39-
send_register_validator_with_timeout(
40-
registrations.clone(),
41-
relay,
42-
send_headers.clone(),
43-
state.pbs_config().timeout_register_validator_ms,
44-
)
45-
.in_current_span(),
46-
));
37+
for relay in relays.clone() {
38+
if let Some(batch_size) = relay.config.validator_registration_batch_size {
39+
for batch in registrations.chunks(batch_size) {
40+
handles.push(tokio::spawn(
41+
send_register_validator_with_timeout(
42+
batch.to_vec(),
43+
relay.clone(),
44+
send_headers.clone(),
45+
state.pbs_config().timeout_register_validator_ms,
46+
)
47+
.in_current_span(),
48+
));
49+
}
50+
} else {
51+
handles.push(tokio::spawn(
52+
send_register_validator_with_timeout(
53+
registrations.clone(),
54+
relay.clone(),
55+
send_headers.clone(),
56+
state.pbs_config().timeout_register_validator_ms,
57+
)
58+
.in_current_span(),
59+
));
60+
}
4761
}
4862

4963
if state.pbs_config().wait_all_registrations {

tests/src/mock_validator.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,16 @@ impl MockValidator {
3636
}
3737

3838
pub async fn do_register_validator(&self) -> Result<(), Error> {
39-
let url = self.comm_boost.register_validator_url().unwrap();
39+
self.do_register_custom_validators(vec![]).await
40+
}
4041

41-
let registration: Vec<ValidatorRegistration> = vec![];
42+
pub async fn do_register_custom_validators(
43+
&self,
44+
registrations: Vec<ValidatorRegistration>,
45+
) -> Result<(), Error> {
46+
let url = self.comm_boost.register_validator_url().unwrap();
4247

43-
self.comm_boost.client.post(url).json(&registration).send().await?.error_for_status()?;
48+
self.comm_boost.client.post(url).json(&registrations).send().await?.error_for_status()?;
4449

4550
Ok(())
4651
}

tests/src/utils.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,26 @@ pub fn generate_mock_relay(port: u16, pubkey: BlsPublicKey) -> Result<RelayClien
2828
enable_timing_games: false,
2929
target_first_request_ms: None,
3030
frequency_get_header_ms: None,
31+
validator_registration_batch_size: None,
32+
};
33+
RelayClient::new(config)
34+
}
35+
36+
pub fn generate_mock_relay_with_batch_size(
37+
port: u16,
38+
pubkey: BlsPublicKey,
39+
batch_size: usize,
40+
) -> Result<RelayClient> {
41+
let entry =
42+
RelayEntry { id: format!("mock_{port}"), pubkey, url: get_local_address(port).parse()? };
43+
let config = RelayConfig {
44+
entry,
45+
id: None,
46+
headers: None,
47+
enable_timing_games: false,
48+
target_first_request_ms: None,
49+
frequency_get_header_ms: None,
50+
validator_registration_batch_size: Some(batch_size),
3151
};
3252
RelayClient::new(config)
3353
}

tests/tests/pbs_integration.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
u64,
77
};
88

9-
use alloy::primitives::U256;
9+
use alloy::{primitives::U256, rpc::types::beacon::relay::ValidatorRegistration};
1010
use cb_common::{
1111
config::{PbsConfig, PbsModuleConfig, RuntimeMuxConfig},
1212
pbs::RelayClient,
@@ -18,7 +18,7 @@ use cb_pbs::{DefaultBuilderApi, PbsService, PbsState};
1818
use cb_tests::{
1919
mock_relay::{start_mock_relay_service, MockRelayState},
2020
mock_validator::MockValidator,
21-
utils::{generate_mock_relay, setup_test_env},
21+
utils::{generate_mock_relay, generate_mock_relay_with_batch_size, setup_test_env},
2222
};
2323
use eyre::Result;
2424
use tracing::info;
@@ -143,6 +143,48 @@ async fn test_register_validators() -> Result<()> {
143143
Ok(())
144144
}
145145

146+
#[tokio::test]
147+
async fn test_batch_register_validators() -> Result<()> {
148+
setup_test_env();
149+
let signer = random_secret();
150+
let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into();
151+
152+
let chain = Chain::Holesky;
153+
let port = 3310;
154+
155+
let relays = vec![generate_mock_relay_with_batch_size(port + 1, *pubkey, 5)?];
156+
let mock_state = Arc::new(MockRelayState::new(chain, signer));
157+
tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1));
158+
159+
let config = to_pbs_config(chain, get_pbs_static_config(port), relays);
160+
let state = PbsState::new(config);
161+
tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state));
162+
163+
// leave some time to start servers
164+
tokio::time::sleep(Duration::from_millis(100)).await;
165+
166+
let data = include_str!("../data/registration_holesky.json");
167+
let registrations: Vec<ValidatorRegistration> = serde_json::from_str(data)?;
168+
169+
let mock_validator = MockValidator::new(port)?;
170+
info!("Sending register validator");
171+
let res = mock_validator.do_register_custom_validators(registrations.clone()).await;
172+
173+
// registrations.len() == 17. 5 per batch, 4 batches
174+
assert!(res.is_ok());
175+
assert_eq!(mock_state.received_register_validator(), 4);
176+
177+
let mock_validator = MockValidator::new(port)?;
178+
info!("Sending register validator");
179+
let res = mock_validator.do_register_custom_validators(registrations[..2].to_vec()).await;
180+
181+
// Expected one more registration request
182+
assert!(res.is_ok());
183+
assert_eq!(mock_state.received_register_validator(), 5);
184+
185+
Ok(())
186+
}
187+
146188
#[tokio::test]
147189
async fn test_submit_block() -> Result<()> {
148190
setup_test_env();

0 commit comments

Comments
 (0)