@@ -95,3 +95,205 @@ async fn test_db_elected_leader_recovery_replica_keeps_running() {
9595 leader. shutdown ( ) . await . unwrap ( ) ;
9696 setup. shutdown ( ) . await ;
9797}
98+
99+ /// Test that when the replica enters recovery state (due to falling behind
100+ /// the deferred slots threshold), the leader continues to function and
101+ /// both nodes recover to normal operation.
102+ #[ tokio:: test( flavor = "multi_thread" ) ]
103+ async fn test_db_elected_replica_recovery_leader_keeps_running ( ) {
104+ std:: env:: set_var ( "SOV_TEST_CONST_OVERRIDE_DEFERRED_SLOTS_COUNT" , "40" ) ;
105+
106+ let Some ( setup) = NodeDiscoveryTestSetup :: new ( ) . await else {
107+ return ;
108+ } ;
109+
110+ let key_and_address = read_private_key :: < S > ( "tx_signer_private_key.json" ) ;
111+
112+ let node_1 = setup
113+ . start_node ( "node_1" , ConfiguredNodeRole :: DbElected )
114+ . await ;
115+ let node_2 = setup
116+ . start_node ( "node_2" , ConfiguredNodeRole :: DbElected )
117+ . await ;
118+
119+ node_1. wait_for_sequencer_ready ( ) . await . unwrap ( ) ;
120+ node_2. wait_for_sequencer_ready ( ) . await . unwrap ( ) ;
121+
122+ let ( leader, replica) = establish_leader_and_replica ( node_1, node_2) . await ;
123+
124+ let token_id = config_gas_token_id ( ) ;
125+ let receiver_addr = random_address ( ) ;
126+
127+ // Pause only the replica's update_state loop to prevent it from processing batches
128+ replica. pause_preferred_batches_for_node ( ) . await ;
129+
130+ // Produce DA blocks while replica is paused to exceed the deferred slots threshold.
131+ // With DEFERRED_SLOTS_COUNT=40, the 90% threshold triggers at ~26 blocks of lag.
132+ for _ in 0 ..30 {
133+ setup. da_service . produce_block_now ( ) . await . unwrap ( ) ;
134+ }
135+
136+ // Wait for the replica to sync the DA blocks
137+ replica. wait_for_node_synced ( ) . await . unwrap ( ) ;
138+
139+ // Resume batch processing; on the next state update the replica should enter recovery
140+ replica. resume_preferred_batches_for_node ( ) . await ;
141+ setup. da_service . produce_block_now ( ) . await . unwrap ( ) ;
142+
143+ // Wait until the replica is no longer ready (entered recovery)
144+ let start = std:: time:: Instant :: now ( ) ;
145+
146+ while replica. is_sequencer_ready ( ) . await {
147+ if start. elapsed ( ) > Duration :: from_secs ( 10 ) {
148+ panic ! ( "Timeout waiting for replica to enter recovery" ) ;
149+ }
150+ setup. da_service . produce_block_now ( ) . await . unwrap ( ) ;
151+ tokio:: time:: sleep ( Duration :: from_millis ( 30 ) ) . await ;
152+ }
153+
154+ let mut was_recovering = false ;
155+
156+ while !replica. is_sequencer_ready ( ) . await {
157+ if replica. is_sequencer_recovering ( ) . await {
158+ was_recovering = true
159+ }
160+
161+ setup. da_service . produce_block_now ( ) . await . unwrap ( ) ;
162+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
163+ }
164+
165+ assert ! ( was_recovering) ;
166+
167+ replica. wait_for_sequencer_ready ( ) . await . unwrap ( ) ;
168+
169+ // Verify the leader is still operational after recovery by sending a new transaction
170+ let tx = build_transfer_token_tx :: < S > (
171+ & key_and_address. private_key ,
172+ token_id,
173+ receiver_addr,
174+ AMOUNT ,
175+ 0 ,
176+ ) ;
177+ leader. send_tx_to_sequencer ( & tx) . await . unwrap ( ) ;
178+
179+ let mut event_subscription = replica
180+ . api_client ( )
181+ . subscribe_to_events_with_filter ( "Bank/*" )
182+ . await
183+ . unwrap ( ) ;
184+
185+ wait_for_all_events_with_timeout ( Duration :: from_millis ( 500 ) , 0 , & mut event_subscription) . await ;
186+
187+ replica. shutdown ( ) . await . unwrap ( ) ;
188+ leader. shutdown ( ) . await . unwrap ( ) ;
189+ setup. shutdown ( ) . await ;
190+ }
191+
192+ /// Test that when both the leader and replica enter recovery state (due to
193+ /// falling behind the deferred slots threshold), both recover to normal
194+ /// operation.
195+ #[ tokio:: test( flavor = "multi_thread" ) ]
196+ async fn test_db_elected_both_nodes_recovery ( ) {
197+ std:: env:: set_var ( "SOV_TEST_CONST_OVERRIDE_DEFERRED_SLOTS_COUNT" , "40" ) ;
198+
199+ let Some ( setup) = NodeDiscoveryTestSetup :: new ( ) . await else {
200+ return ;
201+ } ;
202+
203+ let key_and_address = read_private_key :: < S > ( "tx_signer_private_key.json" ) ;
204+
205+ let node_1 = setup
206+ . start_node ( "node_1" , ConfiguredNodeRole :: DbElected )
207+ . await ;
208+ let node_2 = setup
209+ . start_node ( "node_2" , ConfiguredNodeRole :: DbElected )
210+ . await ;
211+
212+ node_1. wait_for_sequencer_ready ( ) . await . unwrap ( ) ;
213+ node_2. wait_for_sequencer_ready ( ) . await . unwrap ( ) ;
214+
215+ let ( leader, replica) = establish_leader_and_replica ( node_1, node_2) . await ;
216+
217+ let token_id = config_gas_token_id ( ) ;
218+ let receiver_addr = random_address ( ) ;
219+
220+ // Pause both nodes' update_state loops to prevent batch processing
221+ leader. pause_preferred_batches_for_node ( ) . await ;
222+ replica. pause_preferred_batches_for_node ( ) . await ;
223+
224+ // Produce DA blocks while both nodes are paused to exceed the deferred slots threshold.
225+ // With DEFERRED_SLOTS_COUNT=40, the 90% threshold triggers at ~26 blocks of lag.
226+ for _ in 0 ..30 {
227+ setup. da_service . produce_block_now ( ) . await . unwrap ( ) ;
228+ }
229+
230+ // Wait for both nodes to sync the DA blocks
231+ leader. wait_for_node_synced ( ) . await . unwrap ( ) ;
232+ replica. wait_for_node_synced ( ) . await . unwrap ( ) ;
233+
234+ // Resume batch processing on both; on the next state update both should enter recovery
235+ leader. resume_preferred_batches_for_node ( ) . await ;
236+ replica. resume_preferred_batches_for_node ( ) . await ;
237+ setup. da_service . produce_block_now ( ) . await . unwrap ( ) ;
238+
239+ // Wait until the leader is no longer ready (entered recovery)
240+ let start = std:: time:: Instant :: now ( ) ;
241+
242+ while leader. is_sequencer_ready ( ) . await {
243+ if start. elapsed ( ) > Duration :: from_secs ( 10 ) {
244+ panic ! ( "Timeout waiting for leader to enter recovery" ) ;
245+ }
246+ setup. da_service . produce_block_now ( ) . await . unwrap ( ) ;
247+ tokio:: time:: sleep ( Duration :: from_millis ( 30 ) ) . await ;
248+ }
249+
250+ let mut leader_was_recovering = false ;
251+
252+ while !leader. is_sequencer_ready ( ) . await {
253+ if leader. is_sequencer_recovering ( ) . await {
254+ leader_was_recovering = true
255+ }
256+
257+ setup. da_service . produce_block_now ( ) . await . unwrap ( ) ;
258+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
259+ }
260+
261+ assert ! ( leader_was_recovering) ;
262+
263+ leader. wait_for_sequencer_ready ( ) . await . unwrap ( ) ;
264+
265+ // Wait for the replica to also recover
266+ let start = std:: time:: Instant :: now ( ) ;
267+
268+ while !replica. is_sequencer_ready ( ) . await {
269+ if start. elapsed ( ) > Duration :: from_secs ( 30 ) {
270+ panic ! ( "Timeout waiting for replica to recover" ) ;
271+ }
272+ setup. da_service . produce_block_now ( ) . await . unwrap ( ) ;
273+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
274+ }
275+
276+ replica. wait_for_sequencer_ready ( ) . await . unwrap ( ) ;
277+
278+ // Verify the cluster is operational after both nodes recovered
279+ let tx = build_transfer_token_tx :: < S > (
280+ & key_and_address. private_key ,
281+ token_id,
282+ receiver_addr,
283+ AMOUNT ,
284+ 0 ,
285+ ) ;
286+ leader. send_tx_to_sequencer ( & tx) . await . unwrap ( ) ;
287+
288+ let mut event_subscription = replica
289+ . api_client ( )
290+ . subscribe_to_events_with_filter ( "Bank/*" )
291+ . await
292+ . unwrap ( ) ;
293+
294+ wait_for_all_events_with_timeout ( Duration :: from_millis ( 500 ) , 0 , & mut event_subscription) . await ;
295+
296+ replica. shutdown ( ) . await . unwrap ( ) ;
297+ leader. shutdown ( ) . await . unwrap ( ) ;
298+ setup. shutdown ( ) . await ;
299+ }
0 commit comments