@@ -6,10 +6,8 @@ use crate::{
66} ;
77use anyhow:: { bail, Result } ;
88use backoff:: ExponentialBackoff ;
9- use futures_util:: stream;
109use pyth_lazer_protocol:: subscription:: { SubscribeRequest , SubscriptionId } ;
1110use tokio:: sync:: mpsc:: { self , error:: TrySendError } ;
12- use tokio_stream:: { wrappers:: ReceiverStream , StreamExt } ;
1311use tracing:: { error, warn} ;
1412use ttl_cache:: TtlCache ;
1513use url:: Url ;
@@ -22,7 +20,6 @@ pub struct PythLazerClient {
2220 access_token : String ,
2321 num_connections : usize ,
2422 ws_connections : Vec < PythLazerResilientWSConnection > ,
25- receivers : Vec < mpsc:: Receiver < AnyResponse > > ,
2623 backoff : ExponentialBackoff ,
2724}
2825
@@ -50,33 +47,30 @@ impl PythLazerClient {
5047 access_token,
5148 num_connections,
5249 ws_connections : Vec :: with_capacity ( num_connections) ,
53- receivers : Vec :: with_capacity ( num_connections) ,
5450 backoff,
5551 } )
5652 }
5753
5854 pub async fn start ( & mut self , channel_capacity : usize ) -> Result < mpsc:: Receiver < AnyResponse > > {
5955 let ( sender, receiver) = mpsc:: channel :: < AnyResponse > ( channel_capacity) ;
56+ let ( ws_connection_sender, mut ws_connection_receiver) =
57+ mpsc:: channel :: < AnyResponse > ( CHANNEL_CAPACITY ) ;
6058
6159 for i in 0 ..self . num_connections {
6260 let endpoint = self . endpoints [ i % self . endpoints . len ( ) ] . clone ( ) ;
63- let ( sender, receiver) = mpsc:: channel :: < AnyResponse > ( CHANNEL_CAPACITY ) ;
6461 let connection = PythLazerResilientWSConnection :: new (
6562 endpoint,
6663 self . access_token . clone ( ) ,
6764 self . backoff . clone ( ) ,
68- sender . clone ( ) ,
65+ ws_connection_sender . clone ( ) ,
6966 ) ;
7067 self . ws_connections . push ( connection) ;
71- self . receivers . push ( receiver) ;
7268 }
7369
74- let streams: Vec < _ > = self . receivers . drain ( ..) . map ( ReceiverStream :: new) . collect ( ) ;
75- let mut merged_stream = stream:: select_all ( streams) ;
7670 let mut seen_updates = TtlCache :: new ( DEDUP_CACHE_SIZE ) ;
7771
7872 tokio:: spawn ( async move {
79- while let Some ( response) = merged_stream . next ( ) . await {
73+ while let Some ( response) = ws_connection_receiver . recv ( ) . await {
8074 let cache_key = response. cache_key ( ) ;
8175 if seen_updates. contains_key ( & cache_key) {
8276 continue ;
0 commit comments