Skip to content

Commit 0f612e7

Browse files
committed
configurable backoff
1 parent 0849884 commit 0f612e7

File tree

3 files changed

+32
-14
lines changed

3 files changed

+32
-14
lines changed

lazer/sdk/rust/client/examples/subscribe_price_feeds.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use backoff::ExponentialBackoffBuilder;
12
use base64::Engine;
23
use pyth_lazer_client::client::PythLazerClient;
34
use pyth_lazer_client::ws_connection::AnyResponse;
@@ -36,7 +37,10 @@ async fn main() -> anyhow::Result<()> {
3637
vec!["wss://pyth-lazer.dourolabs.app/v1/stream".to_string()],
3738
get_lazer_access_token(),
3839
1,
39-
);
40+
ExponentialBackoffBuilder::default()
41+
.with_max_elapsed_time(None) // max_elapsed_time is not supported in Pyth Lazer client
42+
.build(),
43+
)?;
4044

4145
let stream = client.start().await?;
4246
pin!(stream);

lazer/sdk/rust/client/src/client.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use crate::{
22
resilient_ws_connection::PythLazerResilientWSConnection, ws_connection::AnyResponse,
33
CHANNEL_CAPACITY,
44
};
5-
use anyhow::Result;
5+
use anyhow::{bail, Result};
6+
use backoff::ExponentialBackoff;
67
use futures_util::stream;
78
use pyth_lazer_protocol::subscription::{SubscribeRequest, SubscriptionId};
89
use tokio::sync::mpsc::{self, error::TrySendError};
@@ -15,6 +16,7 @@ pub struct PythLazerClient {
1516
num_connections: usize,
1617
ws_connections: Vec<PythLazerResilientWSConnection>,
1718
receivers: Vec<mpsc::Receiver<AnyResponse>>,
19+
backoff: ExponentialBackoff,
1820
}
1921

2022
impl PythLazerClient {
@@ -24,14 +26,23 @@ impl PythLazerClient {
2426
/// * `endpoints` - A vector of endpoint URLs
2527
/// * `access_token` - The access token for authentication
2628
/// * `num_connections` - The number of WebSocket connections to maintain
27-
pub fn new(endpoints: Vec<String>, access_token: String, num_connections: usize) -> Self {
28-
Self {
29+
pub fn new(
30+
endpoints: Vec<String>,
31+
access_token: String,
32+
num_connections: usize,
33+
backoff: ExponentialBackoff,
34+
) -> Result<Self> {
35+
if backoff.max_elapsed_time.is_some() {
36+
bail!("max_elapsed_time is not supported in Pyth Lazer client");
37+
}
38+
Ok(Self {
2939
endpoints,
3040
access_token,
3141
num_connections,
3242
ws_connections: Vec::with_capacity(num_connections),
3343
receivers: Vec::with_capacity(num_connections),
34-
}
44+
backoff,
45+
})
3546
}
3647

3748
pub async fn start(&mut self) -> Result<mpsc::Receiver<AnyResponse>> {
@@ -43,6 +54,7 @@ impl PythLazerClient {
4354
let connection = PythLazerResilientWSConnection::new(
4455
endpoint,
4556
self.access_token.clone(),
57+
self.backoff.clone(),
4658
sender.clone(),
4759
);
4860
self.ws_connections.push(connection);

lazer/sdk/rust/client/src/resilient_ws_connection.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use backoff::{
2-
backoff::Backoff, exponential::ExponentialBackoff, ExponentialBackoffBuilder, SystemClock,
3-
};
1+
use backoff::{backoff::Backoff, ExponentialBackoff};
42
use futures_util::StreamExt;
53
use pyth_lazer_protocol::subscription::{
64
Request, SubscribeRequest, SubscriptionId, UnsubscribeRequest,
@@ -28,9 +26,14 @@ impl PythLazerResilientWSConnection {
2826
///
2927
/// # Returns
3028
/// Returns a new client instance (not yet connected)
31-
pub fn new(endpoint: String, access_token: String, sender: mpsc::Sender<AnyResponse>) -> Self {
29+
pub fn new(
30+
endpoint: String,
31+
access_token: String,
32+
backoff: ExponentialBackoff,
33+
sender: mpsc::Sender<AnyResponse>,
34+
) -> Self {
3235
let (request_sender, mut request_receiver) = mpsc::channel(CHANNEL_CAPACITY);
33-
let mut task = PythLazerResilientWSConnectionTask::new(endpoint, access_token);
36+
let mut task = PythLazerResilientWSConnectionTask::new(endpoint, access_token, backoff);
3437

3538
tokio::spawn(async move {
3639
if let Err(e) = task.run(sender, &mut request_receiver).await {
@@ -62,17 +65,16 @@ struct PythLazerResilientWSConnectionTask {
6265
endpoint: String,
6366
access_token: String,
6467
subscriptions: Vec<SubscribeRequest>,
65-
backoff: ExponentialBackoff<SystemClock>,
68+
backoff: ExponentialBackoff,
6669
}
6770

6871
impl PythLazerResilientWSConnectionTask {
69-
pub fn new(endpoint: String, access_token: String) -> Self {
72+
pub fn new(endpoint: String, access_token: String, backoff: ExponentialBackoff) -> Self {
7073
Self {
7174
endpoint,
7275
access_token,
7376
subscriptions: Vec::new(),
74-
// TODO: make backoff configurable
75-
backoff: ExponentialBackoffBuilder::new().build(),
77+
backoff,
7678
}
7779
}
7880

0 commit comments

Comments
 (0)