Skip to content

Commit 5a3b3c9

Browse files
committed
refactor: move binary msg parsing to protocol crate, improve examples, source token from env for easier testing
1 parent c3d11c3 commit 5a3b3c9

File tree

6 files changed

+145
-106
lines changed

6 files changed

+145
-106
lines changed

lazer/Cargo.lock

Lines changed: 9 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lazer/sdk/rust/client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
1212
futures-util = "0.3"
1313
serde = { version = "1.0", features = ["derive"] }
1414
serde_json = "1.0"
15-
base64 = "0.21"
15+
base64 = "0.22.1"
1616
anyhow = "1.0"
1717
tracing = "0.1"
1818
url = "2.4"

lazer/sdk/rust/client/examples/subscribe_price_feeds.rs

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,49 +6,87 @@ use pyth_lazer_protocol::router::{
66
};
77
use pyth_lazer_protocol::subscription::{Request, SubscribeRequest, SubscriptionId};
88

9+
fn get_lazer_access_token() -> String {
10+
// Place your access token in your env at LAZER_ACCESS_TOKEN or set it here
11+
let token = "your token here";
12+
std::env::var("LAZER_ACCESS_TOKEN").unwrap_or_else(|_| token.to_string())
13+
}
14+
915
#[tokio::main]
1016
async fn main() -> anyhow::Result<()> {
1117
// Create and start the client
1218
let mut client = LazerClient::new(
1319
"wss://pyth-lazer.dourolabs.app/v1/stream",
14-
"YOUR_ACCESS_TOKEN",
20+
&get_lazer_access_token(),
1521
)?;
1622
let mut stream = client.start().await?;
17-
// Create subscription request
18-
let subscription_id = SubscriptionId(1);
19-
let subscription_request = SubscribeRequest {
20-
subscription_id,
21-
params: SubscriptionParams::new(SubscriptionParamsRepr {
22-
price_feed_ids: vec![PriceFeedId(1), PriceFeedId(2), PriceFeedId(3)],
23-
properties: vec![PriceFeedProperty::Price],
24-
chains: vec![Chain::Solana],
25-
delivery_format: DeliveryFormat::Binary,
26-
json_binary_encoding: JsonBinaryEncoding::default(),
27-
parsed: false,
28-
channel: Channel::FixedRate(FixedRate::from_ms(200).expect("unsupported update rate")),
29-
})
30-
.expect("invalid subscription params"),
31-
};
32-
33-
client
34-
.subscribe(Request::Subscribe(subscription_request))
35-
.await?;
36-
37-
println!("Subscribed to BTC/USD price feed. Waiting for updates...");
38-
39-
// Process the first 50 updates
23+
24+
let subscription_requests = vec![
25+
// Example subscription: Parsed JSON feed targeting Solana
26+
SubscribeRequest {
27+
subscription_id: SubscriptionId(1),
28+
params: SubscriptionParams::new(SubscriptionParamsRepr {
29+
price_feed_ids: vec![PriceFeedId(1), PriceFeedId(2)],
30+
properties: vec![
31+
PriceFeedProperty::Price,
32+
PriceFeedProperty::Exponent,
33+
PriceFeedProperty::BestAskPrice,
34+
PriceFeedProperty::BestBidPrice,
35+
],
36+
chains: vec![Chain::Solana],
37+
delivery_format: DeliveryFormat::Json,
38+
json_binary_encoding: JsonBinaryEncoding::default(),
39+
parsed: true,
40+
channel: Channel::FixedRate(
41+
FixedRate::from_ms(200).expect("unsupported update rate"),
42+
),
43+
})
44+
.expect("invalid subscription params"),
45+
},
46+
// Example subscription: binary feed targeting Solana and EVM
47+
SubscribeRequest {
48+
subscription_id: SubscriptionId(2),
49+
params: SubscriptionParams::new(SubscriptionParamsRepr {
50+
price_feed_ids: vec![PriceFeedId(3), PriceFeedId(4)],
51+
properties: vec![
52+
PriceFeedProperty::Price,
53+
PriceFeedProperty::Exponent,
54+
PriceFeedProperty::BestAskPrice,
55+
PriceFeedProperty::BestBidPrice,
56+
],
57+
chains: vec![Chain::Evm, Chain::Solana],
58+
delivery_format: DeliveryFormat::Binary,
59+
json_binary_encoding: JsonBinaryEncoding::default(),
60+
parsed: false,
61+
channel: Channel::FixedRate(
62+
FixedRate::from_ms(50).expect("unsupported update rate"),
63+
),
64+
})
65+
.expect("invalid subscription params"),
66+
},
67+
];
68+
69+
for req in subscription_requests {
70+
client.subscribe(Request::Subscribe(req)).await?;
71+
}
72+
73+
println!("Subscribed to price feeds. Waiting for updates...");
74+
75+
// Process the first few updates
4076
let mut count = 0;
4177
while let Some(msg) = stream.next().await {
4278
println!("Received update: {:?}", msg?);
4379
count += 1;
44-
if count >= 10 {
80+
if count >= 50 {
4581
break;
4682
}
4783
}
4884

4985
// Unsubscribe before exiting
50-
client.unsubscribe(subscription_id).await?;
51-
println!("Unsubscribed from {:?}", subscription_id);
86+
for sub_id in [SubscriptionId(1), SubscriptionId(2)] {
87+
client.unsubscribe(sub_id).await?;
88+
println!("Unsubscribed from {:?}", sub_id);
89+
}
5290

5391
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
5492
client.close().await?;

lazer/sdk/rust/client/src/lib.rs

Lines changed: 3 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,10 @@
11
use anyhow::Result;
2-
use base64::Engine;
32
use futures_util::{SinkExt, StreamExt};
4-
use pyth_lazer_protocol::{
5-
message::{EvmMessage, SolanaMessage},
6-
payload::{
7-
BINARY_UPDATE_FORMAT_MAGIC, EVM_FORMAT_MAGIC, PARSED_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC_BE,
8-
},
9-
router::{JsonBinaryData, JsonBinaryEncoding, JsonUpdate},
10-
subscription::{
11-
ErrorResponse, Request, Response, StreamUpdatedResponse, SubscriptionId, UnsubscribeRequest,
12-
},
3+
use pyth_lazer_protocol::subscription::{
4+
ErrorResponse, Request, Response, SubscriptionId, UnsubscribeRequest,
135
};
146
use tokio_tungstenite::{connect_async, tungstenite::Message};
157
use url::Url;
16-
17-
/// Response type for binary messages containing chain-specific data
18-
#[derive(Debug)]
19-
pub enum BinaryResponse {
20-
/// EVM chain message with payload and signature
21-
Evm(EvmMessage),
22-
/// Solana chain message with payload and signature
23-
Solana(SolanaMessage),
24-
/// Parsed JSON payload for human-readable format
25-
Parsed(serde_json::Value),
26-
}
27-
288
/// A WebSocket client for consuming Pyth Lazer price feed updates
299
///
3010
/// This client provides a simple interface to:
@@ -86,59 +66,7 @@ impl LazerClient {
8666
let msg = msg?;
8767
match msg {
8868
Message::Text(text) => Ok(serde_json::from_str(&text)?),
89-
Message::Binary(data) => {
90-
let mut pos = 0;
91-
let magic = u32::from_be_bytes(data[pos..pos + 4].try_into()?);
92-
pos += 4;
93-
94-
if magic != BINARY_UPDATE_FORMAT_MAGIC {
95-
anyhow::bail!("binary update format magic mismatch");
96-
}
97-
98-
let subscription_id =
99-
SubscriptionId(u64::from_be_bytes(data[pos..pos + 8].try_into()?));
100-
pos += 8;
101-
102-
let mut evm = None;
103-
let mut solana = None;
104-
let mut parsed = None;
105-
106-
while pos < data.len() {
107-
let len = u16::from_be_bytes(data[pos..pos + 2].try_into()?) as usize;
108-
pos += 2;
109-
let magic = u32::from_be_bytes(data[pos..pos + 4].try_into()?);
110-
111-
match magic {
112-
EVM_FORMAT_MAGIC => {
113-
evm = Some(EvmMessage::deserialize_slice(&data[pos..pos + len])?);
114-
}
115-
SOLANA_FORMAT_MAGIC_BE => {
116-
solana =
117-
Some(SolanaMessage::deserialize_slice(&data[pos..pos + len])?);
118-
}
119-
PARSED_FORMAT_MAGIC => {
120-
parsed = Some(serde_json::from_slice(&data[pos + 4..pos + len])?);
121-
}
122-
_ => anyhow::bail!("unknown magic: {}", magic),
123-
}
124-
pos += len;
125-
}
126-
127-
Ok(Response::StreamUpdated(StreamUpdatedResponse {
128-
subscription_id,
129-
payload: JsonUpdate {
130-
evm: evm.map(|m| JsonBinaryData {
131-
encoding: JsonBinaryEncoding::Base64,
132-
data: base64::engine::general_purpose::STANDARD.encode(&m.payload),
133-
}),
134-
solana: solana.map(|m| JsonBinaryData {
135-
encoding: JsonBinaryEncoding::Base64,
136-
data: base64::engine::general_purpose::STANDARD.encode(&m.payload),
137-
}),
138-
parsed,
139-
},
140-
}))
141-
}
69+
Message::Binary(data) => Ok(Response::from_binary(&data)?),
14270
Message::Close(_) => Ok(Response::Error(ErrorResponse {
14371
error: "WebSocket connection closed".to_string(),
14472
})),

lazer/sdk/rust/protocol/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ repository = "https://github.com/pyth-network/pyth-crosschain"
1010
byteorder = "1.5.0"
1111
anyhow = "1.0.89"
1212
serde = { version = "1.0.210", features = ["derive"] }
13+
serde_json = "1.0"
1314
derive_more = { version = "1.0.0", features = ["from"] }
1415
itertools = "0.13.0"
1516
rust_decimal = "1.36.0"
17+
base64 = "0.22.1"
1618

1719
[dev-dependencies]
1820
bincode = "1.3.3"

lazer/sdk/rust/protocol/src/subscription.rs

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,16 @@
22
//! used across publishers, agents and routers.
33
44
use {
5-
crate::router::{JsonUpdate, SubscriptionParams},
5+
crate::{
6+
message::{EvmMessage, SolanaMessage},
7+
payload::{
8+
BINARY_UPDATE_FORMAT_MAGIC, EVM_FORMAT_MAGIC, PARSED_FORMAT_MAGIC,
9+
SOLANA_FORMAT_MAGIC_BE,
10+
},
11+
router::{JsonBinaryData, JsonBinaryEncoding, JsonUpdate, SubscriptionParams},
12+
},
13+
anyhow::bail,
14+
base64::Engine,
615
derive_more::From,
716
serde::{Deserialize, Serialize},
817
};
@@ -33,7 +42,7 @@ pub struct UnsubscribeRequest {
3342
pub subscription_id: SubscriptionId,
3443
}
3544

36-
/// A response sent from the server to the client.
45+
/// A JSON response sent from the server to the client.
3746
#[derive(Debug, Clone, Serialize, Deserialize, From)]
3847
#[serde(tag = "type")]
3948
#[serde(rename_all = "camelCase")]
@@ -45,6 +54,60 @@ pub enum Response {
4554
StreamUpdated(StreamUpdatedResponse),
4655
}
4756

57+
impl Response {
58+
/// Parse a binary server message into a Response
59+
pub fn from_binary(data: &[u8]) -> anyhow::Result<Self> {
60+
let mut pos = 0;
61+
let magic = u32::from_be_bytes(data[pos..pos + 4].try_into()?);
62+
pos += 4;
63+
64+
if magic != BINARY_UPDATE_FORMAT_MAGIC {
65+
bail!("binary update format magic mismatch");
66+
}
67+
68+
let subscription_id = SubscriptionId(u64::from_be_bytes(data[pos..pos + 8].try_into()?));
69+
pos += 8;
70+
71+
let mut evm = None;
72+
let mut solana = None;
73+
let mut parsed = None;
74+
75+
while pos < data.len() {
76+
let len = u16::from_be_bytes(data[pos..pos + 2].try_into()?) as usize;
77+
pos += 2;
78+
let magic = u32::from_be_bytes(data[pos..pos + 4].try_into()?);
79+
80+
match magic {
81+
EVM_FORMAT_MAGIC => {
82+
evm = Some(EvmMessage::deserialize_slice(&data[pos..pos + len])?);
83+
}
84+
SOLANA_FORMAT_MAGIC_BE => {
85+
solana = Some(SolanaMessage::deserialize_slice(&data[pos..pos + len])?);
86+
}
87+
PARSED_FORMAT_MAGIC => {
88+
parsed = Some(serde_json::from_slice(&data[pos + 4..pos + len])?);
89+
}
90+
_ => bail!("unknown magic: {}", magic),
91+
}
92+
pos += len;
93+
}
94+
95+
Ok(Response::StreamUpdated(StreamUpdatedResponse {
96+
subscription_id,
97+
payload: JsonUpdate {
98+
evm: evm.map(|m| JsonBinaryData {
99+
encoding: JsonBinaryEncoding::Base64,
100+
data: base64::engine::general_purpose::STANDARD.encode(&m.payload),
101+
}),
102+
solana: solana.map(|m| JsonBinaryData {
103+
encoding: JsonBinaryEncoding::Base64,
104+
data: base64::engine::general_purpose::STANDARD.encode(&m.payload),
105+
}),
106+
parsed,
107+
},
108+
}))
109+
}
110+
}
48111
/// Sent from the server after a successul subscription.
49112
#[derive(Debug, Clone, Serialize, Deserialize)]
50113
#[serde(rename_all = "camelCase")]

0 commit comments

Comments
 (0)