1+ use std:: time:: Duration ;
2+
13use backoff:: { backoff:: Backoff , ExponentialBackoff } ;
24use futures_util:: StreamExt ;
35use pyth_lazer_protocol:: subscription:: {
46 Request , SubscribeRequest , SubscriptionId , UnsubscribeRequest ,
57} ;
6- use tokio:: { pin, select, sync:: mpsc} ;
8+ use tokio:: { pin, select, sync:: mpsc, time :: Instant } ;
79use tracing:: { error, info, warn} ;
810
911use crate :: {
@@ -12,6 +14,8 @@ use crate::{
1214} ;
1315use anyhow:: { bail, Context , Result } ;
1416
17+ const BACKOFF_RESET_DURATION : Duration = Duration :: from_secs ( 10 ) ;
18+
1519pub struct PythLazerResilientWSConnection {
1620 request_sender : mpsc:: Sender < Request > ,
1721}
@@ -83,8 +87,15 @@ impl PythLazerResilientWSConnectionTask {
8387 response_sender : mpsc:: Sender < AnyResponse > ,
8488 request_receiver : & mut mpsc:: Receiver < Request > ,
8589 ) -> Result < ( ) > {
90+ let mut last_failure_time = Instant :: now ( ) ;
91+
8692 loop {
8793 if let Err ( e) = self . start ( response_sender. clone ( ) , request_receiver) . await {
94+ if last_failure_time. elapsed ( ) > BACKOFF_RESET_DURATION {
95+ self . backoff . reset ( ) ;
96+ }
97+ last_failure_time = Instant :: now ( ) ;
98+
8899 let delay = self . backoff . next_backoff ( ) ;
89100 match delay {
90101 Some ( d) => {
0 commit comments