Skip to content

Commit 0849884

Browse files
committed
feat(lazer): add resilient client in rust
1 parent 4871ad3 commit 0849884

File tree

7 files changed

+439
-151
lines changed

7 files changed

+439
-151
lines changed

Cargo.lock

Lines changed: 4 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lazer/sdk/rust/client/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
[package]
22
name = "pyth-lazer-client"
3-
version = "0.1.3"
3+
version = "1.0.0"
44
edition = "2021"
55
description = "A Rust client for Pyth Lazer"
66
license = "Apache-2.0"
77

88
[dependencies]
99
pyth-lazer-protocol = { path = "../protocol", version = "0.9.0" }
1010
tokio = { version = "1", features = ["full"] }
11+
tokio-stream = "0.1.17"
1112
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
1213
futures-util = "0.3"
1314
serde = { version = "1.0", features = ["derive"] }
@@ -17,6 +18,7 @@ anyhow = "1.0"
1718
tracing = "0.1"
1819
url = "2.4"
1920
derive_more = { version = "1.0.0", features = ["from"] }
21+
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
2022

2123
[dev-dependencies]
2224
bincode = "1.3.3"
@@ -25,3 +27,4 @@ hex = "0.4.3"
2527
libsecp256k1 = "0.7.1"
2628
bs58 = "0.5.1"
2729
alloy-primitives = "0.8.19"
30+
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] }

lazer/sdk/rust/client/examples/subscribe_price_feeds.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use base64::Engine;
2-
use futures_util::StreamExt;
3-
use pyth_lazer_client::{AnyResponse, LazerClient};
2+
use pyth_lazer_client::client::PythLazerClient;
3+
use pyth_lazer_client::ws_connection::AnyResponse;
44
use pyth_lazer_protocol::message::{
55
EvmMessage, LeEcdsaMessage, LeUnsignedMessage, Message, SolanaMessage,
66
};
@@ -9,8 +9,10 @@ use pyth_lazer_protocol::router::{
99
Channel, DeliveryFormat, FixedRate, Format, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty,
1010
SubscriptionParams, SubscriptionParamsRepr,
1111
};
12-
use pyth_lazer_protocol::subscription::{Request, Response, SubscribeRequest, SubscriptionId};
12+
use pyth_lazer_protocol::subscription::{Response, SubscribeRequest, SubscriptionId};
1313
use tokio::pin;
14+
use tracing::level_filters::LevelFilter;
15+
use tracing_subscriber::EnvFilter;
1416

1517
fn get_lazer_access_token() -> String {
1618
// Place your access token in your env at LAZER_ACCESS_TOKEN or set it here
@@ -20,11 +22,22 @@ fn get_lazer_access_token() -> String {
2022

2123
#[tokio::main]
2224
async fn main() -> anyhow::Result<()> {
25+
tracing_subscriber::fmt()
26+
.with_env_filter(
27+
EnvFilter::builder()
28+
.with_default_directive(LevelFilter::INFO.into())
29+
.from_env()?,
30+
)
31+
.json()
32+
.init();
33+
2334
// Create and start the client
24-
let mut client = LazerClient::new(
25-
"wss://pyth-lazer.dourolabs.app/v1/stream",
26-
&get_lazer_access_token(),
27-
)?;
35+
let mut client = PythLazerClient::new(
36+
vec!["wss://pyth-lazer.dourolabs.app/v1/stream".to_string()],
37+
get_lazer_access_token(),
38+
1,
39+
);
40+
2841
let stream = client.start().await?;
2942
pin!(stream);
3043

@@ -72,16 +85,16 @@ async fn main() -> anyhow::Result<()> {
7285
];
7386

7487
for req in subscription_requests {
75-
client.subscribe(Request::Subscribe(req)).await?;
88+
client.subscribe(req).await?;
7689
}
7790

7891
println!("Subscribed to price feeds. Waiting for updates...");
7992

8093
// Process the first few updates
8194
let mut count = 0;
82-
while let Some(msg) = stream.next().await {
95+
while let Some(msg) = stream.recv().await {
8396
// The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them.
84-
match msg? {
97+
match msg {
8598
AnyResponse::Json(msg) => match msg {
8699
Response::StreamUpdated(update) => {
87100
println!("Received a JSON update for {:?}", update.subscription_id);
@@ -189,8 +202,6 @@ async fn main() -> anyhow::Result<()> {
189202
println!("Unsubscribed from {sub_id:?}");
190203
}
191204

192-
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
193-
client.close().await?;
194205
Ok(())
195206
}
196207

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use crate::{
2+
resilient_ws_connection::PythLazerResilientWSConnection, ws_connection::AnyResponse,
3+
CHANNEL_CAPACITY,
4+
};
5+
use anyhow::Result;
6+
use futures_util::stream;
7+
use pyth_lazer_protocol::subscription::{SubscribeRequest, SubscriptionId};
8+
use tokio::sync::mpsc::{self, error::TrySendError};
9+
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
10+
use tracing::{error, warn};
11+
12+
pub struct PythLazerClient {
13+
endpoints: Vec<String>,
14+
access_token: String,
15+
num_connections: usize,
16+
ws_connections: Vec<PythLazerResilientWSConnection>,
17+
receivers: Vec<mpsc::Receiver<AnyResponse>>,
18+
}
19+
20+
impl PythLazerClient {
21+
/// Creates a new client instance
22+
///
23+
/// # Arguments
24+
/// * `endpoints` - A vector of endpoint URLs
25+
/// * `access_token` - The access token for authentication
26+
/// * `num_connections` - The number of WebSocket connections to maintain
27+
pub fn new(endpoints: Vec<String>, access_token: String, num_connections: usize) -> Self {
28+
Self {
29+
endpoints,
30+
access_token,
31+
num_connections,
32+
ws_connections: Vec::with_capacity(num_connections),
33+
receivers: Vec::with_capacity(num_connections),
34+
}
35+
}
36+
37+
pub async fn start(&mut self) -> Result<mpsc::Receiver<AnyResponse>> {
38+
let (sender, receiver) = mpsc::channel::<AnyResponse>(CHANNEL_CAPACITY);
39+
40+
for i in 0..self.num_connections {
41+
let endpoint = self.endpoints[i % self.endpoints.len()].clone();
42+
let (sender, receiver) = mpsc::channel::<AnyResponse>(CHANNEL_CAPACITY);
43+
let connection = PythLazerResilientWSConnection::new(
44+
endpoint,
45+
self.access_token.clone(),
46+
sender.clone(),
47+
);
48+
self.ws_connections.push(connection);
49+
self.receivers.push(receiver);
50+
}
51+
52+
let streams: Vec<_> = self.receivers.drain(..).map(ReceiverStream::new).collect();
53+
let mut merged_stream = stream::select_all(streams);
54+
55+
tokio::spawn(async move {
56+
while let Some(response) = merged_stream.next().await {
57+
match sender.try_send(response) {
58+
Ok(_) => (),
59+
Err(TrySendError::Full(r)) => {
60+
warn!("Sender channel is full, responses will be delayed");
61+
if sender.send(r).await.is_err() {
62+
error!("Sender channel is closed, stopping client");
63+
}
64+
}
65+
Err(TrySendError::Closed(_)) => {
66+
error!("Sender channel is closed, stopping client");
67+
}
68+
}
69+
}
70+
});
71+
72+
Ok(receiver)
73+
}
74+
75+
pub async fn subscribe(&mut self, subscribe_request: SubscribeRequest) -> Result<()> {
76+
for connection in &mut self.ws_connections {
77+
connection.subscribe(subscribe_request.clone()).await?;
78+
}
79+
Ok(())
80+
}
81+
82+
pub async fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<()> {
83+
for connection in &mut self.ws_connections {
84+
connection.unsubscribe(subscription_id).await?;
85+
}
86+
Ok(())
87+
}
88+
}

lazer/sdk/rust/client/src/lib.rs

Lines changed: 4 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -1,138 +1,5 @@
1-
use anyhow::Result;
2-
use derive_more::From;
3-
use futures_util::{SinkExt, StreamExt, TryStreamExt};
4-
use pyth_lazer_protocol::{
5-
binary_update::BinaryWsUpdate,
6-
subscription::{ErrorResponse, Request, Response, SubscriptionId, UnsubscribeRequest},
7-
};
8-
use tokio_tungstenite::{connect_async, tungstenite::Message};
9-
use url::Url;
1+
const CHANNEL_CAPACITY: usize = 1000;
102

11-
/// A WebSocket client for consuming Pyth Lazer price feed updates
12-
///
13-
/// This client provides a simple interface to:
14-
/// - Connect to a Lazer WebSocket endpoint
15-
/// - Subscribe to price feed updates
16-
/// - Receive updates as a stream of messages
17-
///
18-
pub struct LazerClient {
19-
endpoint: Url,
20-
access_token: String,
21-
ws_sender: Option<
22-
futures_util::stream::SplitSink<
23-
tokio_tungstenite::WebSocketStream<
24-
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
25-
>,
26-
Message,
27-
>,
28-
>,
29-
}
30-
31-
#[derive(Debug, Clone, PartialEq, Eq, Hash, From)]
32-
pub enum AnyResponse {
33-
Json(Response),
34-
Binary(BinaryWsUpdate),
35-
}
36-
37-
impl LazerClient {
38-
/// Creates a new Lazer client instance
39-
///
40-
/// # Arguments
41-
/// * `endpoint` - The WebSocket URL of the Lazer service
42-
/// * `access_token` - Access token for authentication
43-
///
44-
/// # Returns
45-
/// Returns a new client instance (not yet connected)
46-
pub fn new(endpoint: &str, access_token: &str) -> Result<Self> {
47-
let endpoint = Url::parse(endpoint)?;
48-
let access_token = access_token.to_string();
49-
Ok(Self {
50-
endpoint,
51-
access_token,
52-
ws_sender: None,
53-
})
54-
}
55-
56-
/// Starts the WebSocket connection
57-
///
58-
/// # Returns
59-
/// Returns a stream of responses from the server
60-
pub async fn start(&mut self) -> Result<impl futures_util::Stream<Item = Result<AnyResponse>>> {
61-
let url = self.endpoint.clone();
62-
let mut request =
63-
tokio_tungstenite::tungstenite::client::IntoClientRequest::into_client_request(url)?;
64-
65-
request.headers_mut().insert(
66-
"Authorization",
67-
format!("Bearer {}", self.access_token).parse().unwrap(),
68-
);
69-
70-
let (ws_stream, _) = connect_async(request).await?;
71-
let (ws_sender, ws_receiver) = ws_stream.split();
72-
73-
self.ws_sender = Some(ws_sender);
74-
let response_stream =
75-
ws_receiver
76-
.map_err(anyhow::Error::from)
77-
.try_filter_map(|msg| async {
78-
let r: Result<Option<AnyResponse>> = match msg {
79-
Message::Text(text) => {
80-
Ok(Some(serde_json::from_str::<Response>(&text)?.into()))
81-
}
82-
Message::Binary(data) => {
83-
Ok(Some(BinaryWsUpdate::deserialize_slice(&data)?.into()))
84-
}
85-
Message::Close(_) => Ok(Some(
86-
Response::Error(ErrorResponse {
87-
error: "WebSocket connection closed".to_string(),
88-
})
89-
.into(),
90-
)),
91-
_ => Ok(None),
92-
};
93-
r
94-
});
95-
96-
Ok(response_stream)
97-
}
98-
99-
/// Subscribes to price feed updates
100-
///
101-
/// # Arguments
102-
/// * `request` - A subscription request containing feed IDs and parameters
103-
pub async fn subscribe(&mut self, request: Request) -> Result<()> {
104-
if let Some(sender) = &mut self.ws_sender {
105-
let msg = serde_json::to_string(&request)?;
106-
sender.send(Message::Text(msg)).await?;
107-
Ok(())
108-
} else {
109-
anyhow::bail!("WebSocket connection not started")
110-
}
111-
}
112-
113-
/// Unsubscribes from a previously subscribed feed
114-
///
115-
/// # Arguments
116-
/// * `subscription_id` - The ID of the subscription to cancel
117-
pub async fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<()> {
118-
if let Some(sender) = &mut self.ws_sender {
119-
let request = Request::Unsubscribe(UnsubscribeRequest { subscription_id });
120-
let msg = serde_json::to_string(&request)?;
121-
sender.send(Message::Text(msg)).await?;
122-
Ok(())
123-
} else {
124-
anyhow::bail!("WebSocket connection not started")
125-
}
126-
}
127-
128-
/// Closes the WebSocket connection
129-
pub async fn close(&mut self) -> Result<()> {
130-
if let Some(sender) = &mut self.ws_sender {
131-
sender.send(Message::Close(None)).await?;
132-
self.ws_sender = None;
133-
Ok(())
134-
} else {
135-
anyhow::bail!("WebSocket connection not started")
136-
}
137-
}
138-
}
3+
pub mod client;
4+
pub mod resilient_ws_connection;
5+
pub mod ws_connection;

0 commit comments

Comments
 (0)