Skip to content

Commit 008f025

Browse files
committed
add timeout
1 parent 856a925 commit 008f025

File tree

3 files changed

+26
-5
lines changed

3 files changed

+26
-5
lines changed

lazer/sdk/rust/client/examples/subscribe_price_feeds.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Duration;
2+
13
use base64::Engine;
24
use pyth_lazer_client::backoff::PythLazerExponentialBackoffBuilder;
35
use pyth_lazer_client::client::PythLazerClient;
@@ -41,6 +43,7 @@ async fn main() -> anyhow::Result<()> {
4143
get_lazer_access_token(),
4244
4,
4345
PythLazerExponentialBackoffBuilder::default().build(),
46+
Duration::from_secs(5), // Timeout for each connection
4447
)?;
4548

4649
let stream = client

lazer/sdk/rust/client/src/client.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub struct PythLazerClient {
2121
num_connections: usize,
2222
ws_connections: Vec<PythLazerResilientWSConnection>,
2323
backoff: ExponentialBackoff,
24+
timeout: Duration,
2425
}
2526

2627
impl PythLazerClient {
@@ -35,6 +36,7 @@ impl PythLazerClient {
3536
access_token: String,
3637
num_connections: usize,
3738
backoff: ExponentialBackoff,
39+
timeout: Duration,
3840
) -> Result<Self> {
3941
if backoff.max_elapsed_time.is_some() {
4042
bail!("max_elapsed_time is not supported in Pyth Lazer client");
@@ -48,6 +50,7 @@ impl PythLazerClient {
4850
num_connections,
4951
ws_connections: Vec::with_capacity(num_connections),
5052
backoff,
53+
timeout,
5154
})
5255
}
5356

@@ -62,6 +65,7 @@ impl PythLazerClient {
6265
endpoint,
6366
self.access_token.clone(),
6467
self.backoff.clone(),
68+
self.timeout,
6569
ws_connection_sender.clone(),
6670
);
6771
self.ws_connections.push(connection);

lazer/sdk/rust/client/src/resilient_ws_connection.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ impl PythLazerResilientWSConnection {
3535
endpoint: Url,
3636
access_token: String,
3737
backoff: ExponentialBackoff,
38+
timeout: Duration,
3839
sender: mpsc::Sender<AnyResponse>,
3940
) -> Self {
4041
let (request_sender, mut request_receiver) = mpsc::channel(CHANNEL_CAPACITY);
41-
let mut task = PythLazerResilientWSConnectionTask::new(endpoint, access_token, backoff);
42+
let mut task =
43+
PythLazerResilientWSConnectionTask::new(endpoint, access_token, backoff, timeout);
4244

4345
tokio::spawn(async move {
4446
if let Err(e) = task.run(sender, &mut request_receiver).await {
@@ -71,15 +73,22 @@ struct PythLazerResilientWSConnectionTask {
7173
access_token: String,
7274
subscriptions: Vec<SubscribeRequest>,
7375
backoff: ExponentialBackoff,
76+
timeout: Duration,
7477
}
7578

7679
impl PythLazerResilientWSConnectionTask {
77-
pub fn new(endpoint: Url, access_token: String, backoff: ExponentialBackoff) -> Self {
80+
pub fn new(
81+
endpoint: Url,
82+
access_token: String,
83+
backoff: ExponentialBackoff,
84+
timeout: Duration,
85+
) -> Self {
7886
Self {
7987
endpoint,
8088
access_token,
8189
subscriptions: Vec::new(),
8290
backoff,
91+
timeout,
8392
}
8493
}
8594

@@ -130,10 +139,12 @@ impl PythLazerResilientWSConnectionTask {
130139
.await?;
131140
}
132141
loop {
142+
let timeout_response = tokio::time::timeout(self.timeout, stream.next());
143+
133144
select! {
134-
response = stream.next() => {
145+
response = timeout_response => {
135146
match response {
136-
Some(response) => match response {
147+
Ok(Some(response)) => match response {
137148
Ok(response) => {
138149
sender
139150
.send(response)
@@ -144,9 +155,12 @@ impl PythLazerResilientWSConnectionTask {
144155
bail!("WebSocket stream error: {}", e);
145156
}
146157
},
147-
None => {
158+
Ok(None) => {
148159
bail!("WebSocket stream ended unexpectedly");
149160
}
161+
Err(_elapsed) => {
162+
bail!("WebSocket stream timed out");
163+
}
150164
}
151165
}
152166
Some(request) = request_receiver.recv() => {

0 commit comments

Comments
 (0)