@@ -10,9 +10,11 @@ use cb_pbs::{DefaultBuilderApi, PbsService, PbsState};
1010use cb_tests:: {
1111 mock_relay:: { MockRelayState , start_mock_relay_service} ,
1212 mock_ssv:: { SsvMockState , create_mock_ssv_server} ,
13+ mock_validator:: MockValidator ,
1314 utils:: { generate_mock_relay, get_pbs_static_config, to_pbs_config} ,
1415} ;
1516use eyre:: Result ;
17+ use reqwest:: StatusCode ;
1618use tokio:: sync:: RwLock ;
1719use tracing:: info;
1820use url:: Url ;
@@ -24,30 +26,46 @@ async fn test_auto_refresh() -> Result<()> {
2426 // This test reads the log files to verify behavior, so we can't attach a global
2527 // trace listener setup_test_env();
2628
27- let signer = random_secret ( ) ;
28- let pubkey = signer. public_key ( ) ;
29+ // Generate 3 keys: one not in the mux relay, one in the relay, and one that
30+ // hasn't been added yet but will be later. The existing key isn't used but is
31+ // needed in the initial config since CB won't start a mux without at least one
32+ // key.
33+ let default_signer = random_secret ( ) ;
34+ let default_pubkey = default_signer. public_key ( ) ;
35+ let existing_mux_signer = random_secret ( ) ;
36+ let existing_mux_pubkey = existing_mux_signer. public_key ( ) ;
37+ let new_mux_signer = random_secret ( ) ;
38+ let new_mux_pubkey = new_mux_signer. public_key ( ) ;
2939
3040 let chain = Chain :: Hoodi ;
3141 let pbs_port = 3710 ;
3242
3343 // Start the mock SSV API server
34- let mut next_port = pbs_port + 1 ;
35- let ssv_api_port = next_port;
36- next_port += 1 ;
44+ let ssv_api_port = pbs_port + 1 ;
3745 let ssv_api_url = Url :: parse ( & format ! ( "http://localhost:{ssv_api_port}" ) ) ?;
3846 let mock_ssv_state = SsvMockState {
39- validators : Arc :: new ( RwLock :: new ( vec ! [ SSVValidator { pubkey: pubkey. clone( ) } ] ) ) ,
47+ validators : Arc :: new ( RwLock :: new ( vec ! [ SSVValidator {
48+ pubkey: existing_mux_pubkey. clone( ) ,
49+ } ] ) ) ,
4050 force_timeout : Arc :: new ( RwLock :: new ( false ) ) ,
4151 } ;
4252 let ssv_server_handle =
4353 create_mock_ssv_server ( ssv_api_port, Some ( mock_ssv_state. clone ( ) ) ) . await ?;
4454
55+ // Start a default relay for non-mux keys
56+ let default_relay_port = ssv_api_port + 1 ;
57+ let default_relay = generate_mock_relay ( default_relay_port, default_pubkey. clone ( ) ) ?;
58+ let default_relay_state = Arc :: new ( MockRelayState :: new ( chain, default_signer. clone ( ) ) ) ;
59+ let default_relay_task =
60+ tokio:: spawn ( start_mock_relay_service ( default_relay_state. clone ( ) , default_relay_port) ) ;
61+
4562 // Start a mock relay to be used by the mux
46- let default_relay = generate_mock_relay ( next_port, pubkey. clone ( ) ) ?;
47- let relay_id = default_relay. id . clone ( ) . to_string ( ) ;
48- let mock_state = Arc :: new ( MockRelayState :: new ( chain, signer) ) ;
49- tokio:: spawn ( start_mock_relay_service ( mock_state. clone ( ) , next_port) ) ;
50- next_port += 1 ;
63+ let mux_relay_port = default_relay_port + 1 ;
64+ let mux_relay = generate_mock_relay ( mux_relay_port, default_pubkey. clone ( ) ) ?;
65+ let mux_relay_id = mux_relay. id . clone ( ) . to_string ( ) ;
66+ let mux_relay_state = Arc :: new ( MockRelayState :: new ( chain, default_signer) ) ;
67+ let mux_relay_task =
68+ tokio:: spawn ( start_mock_relay_service ( mux_relay_state. clone ( ) , mux_relay_port) ) ;
5169
5270 // Create the registry mux
5371 let loader = MuxKeysLoader :: Registry {
@@ -57,11 +75,11 @@ async fn test_auto_refresh() -> Result<()> {
5775 } ;
5876 let muxes = PbsMuxes {
5977 muxes : vec ! [ MuxConfig {
60- id: relay_id . clone( ) ,
78+ id: mux_relay_id . clone( ) ,
6179 loader: Some ( loader) ,
62- late_in_slot_time_ms: Some ( 2000 ) ,
63- relays: vec![ ( * default_relay . config) . clone( ) ] ,
64- timeout_get_header_ms: Some ( 750 ) ,
80+ late_in_slot_time_ms: Some ( u64 :: MAX ) ,
81+ relays: vec![ ( * mux_relay . config) . clone( ) ] ,
82+ timeout_get_header_ms: Some ( u64 :: MAX - 1 ) ,
6583 validator_pubkeys: vec![ ] ,
6684 } ] ,
6785 } ;
@@ -71,45 +89,70 @@ async fn test_auto_refresh() -> Result<()> {
7189 pbs_config. ssv_api_url = Some ( ssv_api_url. clone ( ) ) ;
7290 pbs_config. mux_registry_refresh_interval_seconds = 1 ; // Refresh the mux every second
7391 let ( mux_lookup, registry_muxes) = muxes. validate_and_fill ( chain, & pbs_config) . await ?;
74- let relays = vec ! [ default_relay. clone( ) ] ;
92+ let relays = vec ! [ default_relay. clone( ) ] ; // Default relay only
7593 let mut config = to_pbs_config ( chain, pbs_config, relays) ;
76- config. all_relays = vec ! [ default_relay . clone( ) ] ;
94+ config. all_relays . push ( mux_relay . clone ( ) ) ; // Add the mux relay to just this field
7795 config. mux_lookup = Some ( mux_lookup) ;
7896 config. registry_muxes = Some ( registry_muxes) ;
7997
8098 // Run PBS service
8199 let state = PbsState :: new ( config) ;
82100 let pbs_server = tokio:: spawn ( PbsService :: run :: < ( ) , DefaultBuilderApi > ( state) ) ;
83- info ! ( "Started PBS server with pubkey {pubkey}" ) ;
101+ info ! ( "Started PBS server with pubkey {default_pubkey}" ) ;
102+
103+ // Wait for the server to start
104+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
105+
106+ // Try to run a get_header on the new pubkey, which should use the default
107+ // relay only since it hasn't been seen in the mux yet
108+ let mock_validator = MockValidator :: new ( pbs_port) ?;
109+ info ! ( "Sending get header" ) ;
110+ let res = mock_validator. do_get_header ( Some ( new_mux_pubkey. clone ( ) ) ) . await ?;
111+ assert_eq ! ( res. status( ) , StatusCode :: OK ) ;
112+ assert_eq ! ( default_relay_state. received_get_header( ) , 1 ) ; // default relay was used
113+ assert_eq ! ( mux_relay_state. received_get_header( ) , 0 ) ; // mux relay was not used
84114
85115 // Wait for the first refresh to complete
86116 let wait_for_refresh_time = Duration :: from_secs ( 2 ) ;
87117 tokio:: time:: sleep ( wait_for_refresh_time) . await ;
88118
89119 // Check the logs to ensure a refresh happened
90- assert ! ( logs_contain( & format!( "fetched 1 pubkeys for registry mux {relay_id }" ) ) ) ;
91- assert ! ( !logs_contain( & format!( "fetched 2 pubkeys for registry mux {relay_id }" ) ) ) ;
120+ assert ! ( logs_contain( & format!( "fetched 1 pubkeys for registry mux {mux_relay_id }" ) ) ) ;
121+ assert ! ( !logs_contain( & format!( "fetched 2 pubkeys for registry mux {mux_relay_id }" ) ) ) ;
92122 assert ! ( !logs_contain( "adding new pubkey" ) ) ;
93123
94124 // Add another validator
95- let new_secret = random_secret ( ) ;
96- let new_pubkey = new_secret. public_key ( ) ;
97125 {
98126 let mut validators = mock_ssv_state. validators . write ( ) . await ;
99- validators. push ( SSVValidator { pubkey : new_pubkey . clone ( ) } ) ;
100- info ! ( "Added new validator {new_pubkey } to the SSV mock server" ) ;
127+ validators. push ( SSVValidator { pubkey : new_mux_pubkey . clone ( ) } ) ;
128+ info ! ( "Added new validator {new_mux_pubkey } to the SSV mock server" ) ;
101129 }
102130
103131 // Wait for the next refresh to complete
104132 tokio:: time:: sleep ( wait_for_refresh_time) . await ;
105133
106134 // Check the logs to ensure the new pubkey was added
107- assert ! ( logs_contain( & format!( "adding new pubkey {new_pubkey} to mux {relay_id}" ) ) ) ;
108- assert ! ( logs_contain( & format!( "fetched 2 pubkeys for registry mux {relay_id}" ) ) ) ;
135+ assert ! ( logs_contain( & format!( "adding new pubkey {new_mux_pubkey} to mux {mux_relay_id}" ) ) ) ;
136+ assert ! ( logs_contain( & format!( "fetched 2 pubkeys for registry mux {mux_relay_id}" ) ) ) ;
137+
138+ // Try to run a get_header on the new pubkey - now it should use the mux relay
139+ let res = mock_validator. do_get_header ( Some ( new_mux_pubkey. clone ( ) ) ) . await ?;
140+ assert_eq ! ( res. status( ) , StatusCode :: OK ) ;
141+ assert_eq ! ( default_relay_state. received_get_header( ) , 1 ) ; // default relay was not used here
142+ assert_eq ! ( mux_relay_state. received_get_header( ) , 1 ) ; // mux relay was used
143+
144+ // Finally try to do a get_header with the old pubkey - it should only use the
145+ // default relay
146+ let res = mock_validator. do_get_header ( Some ( default_pubkey. clone ( ) ) ) . await ?;
147+ assert_eq ! ( res. status( ) , StatusCode :: OK ) ;
148+ assert_eq ! ( default_relay_state. received_get_header( ) , 2 ) ; // default relay was used
149+ assert_eq ! ( mux_relay_state. received_get_header( ) , 1 ) ; // mux relay was not used
109150
110151 // Shut down the server handles
111152 pbs_server. abort ( ) ;
112153 ssv_server_handle. abort ( ) ;
154+ default_relay_task. abort ( ) ;
155+ mux_relay_task. abort ( ) ;
113156
114157 Ok ( ( ) )
115158}
0 commit comments