@@ -4,6 +4,7 @@ use super::{metrics::FlashblocksWsInboundMetrics, primitives::FlashblocksPayload
44use futures:: { SinkExt , StreamExt } ;
55use tokio:: { sync:: mpsc, time:: interval} ;
66use tokio_tungstenite:: { connect_async, tungstenite:: Message } ;
7+ use tokio_util:: sync:: CancellationToken ;
78use tracing:: { error, info} ;
89use url:: Url ;
910
@@ -68,6 +69,9 @@ impl FlashblocksReceiverService {
6869 info ! ( "Connected to Flashblocks receiver at {}" , self . url) ;
6970 self . metrics . connection_status . set ( 1 ) ;
7071
72+ let cancel_token = CancellationToken :: new ( ) ;
73+ let cancel_for_ping = cancel_token. clone ( ) ;
74+
7175 let ping_task = tokio:: spawn ( async move {
7276 let mut ping_interval = interval ( Duration :: from_millis ( 500 ) ) ;
7377
@@ -78,14 +82,18 @@ impl FlashblocksReceiverService {
7882 return Err ( FlashblocksReceiverError :: PingFailed ) ;
7983 }
8084 }
85+ _ = cancel_for_ping. cancelled( ) => {
86+ tracing:: debug!( "Ping task cancelled" ) ;
87+ return Ok ( ( ) ) ;
88+ }
8189 }
8290 }
8391 } ) ;
8492
8593 let sender = self . sender . clone ( ) ;
8694 let metrics = self . metrics . clone ( ) ;
8795
88- let read_timeout = Duration :: from_millis ( 500 ) ;
96+ let read_timeout = Duration :: from_millis ( 1500 ) ;
8997 let message_handle = tokio:: spawn ( async move {
9098 loop {
9199 let result = tokio:: time:: timeout ( read_timeout, read. next ( ) )
@@ -128,6 +136,7 @@ impl FlashblocksReceiverService {
128136 } ,
129137 } ;
130138
139+ cancel_token. cancel ( ) ;
131140 result
132141 }
133142}
0 commit comments