Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lazer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion lazer/sdk/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "A Rust client for Pyth Lazer"
license = "Apache-2.0"

[dependencies]
pyth-lazer-protocol = "0.5.0"
pyth-lazer-protocol = { path = "../protocol", version = "0.5.0" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this break the publishing to cargo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, cargo is smart enough to remove paths on publish.

tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
futures-util = "0.3"
Expand All @@ -16,6 +16,7 @@ base64 = "0.22.1"
anyhow = "1.0"
tracing = "0.1"
url = "2.4"
derive_more = { version = "1.0.0", features = ["from"] }

[dev-dependencies]
bincode = "1.3.3"
Expand Down
155 changes: 119 additions & 36 deletions lazer/sdk/rust/client/examples/subscribe_price_feeds.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use base64::Engine;
use futures_util::StreamExt;
use pyth_lazer_client::LazerClient;
use pyth_lazer_protocol::message::{EvmMessage, SolanaMessage};
use pyth_lazer_client::{AnyResponse, LazerClient};
use pyth_lazer_protocol::message::{
EvmMessage, LeEcdsaMessage, LeUnsignedMessage, Message, SolanaMessage,
};
use pyth_lazer_protocol::payload::PayloadData;
use pyth_lazer_protocol::router::{
Chain, Channel, DeliveryFormat, FixedRate, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty,
Channel, DeliveryFormat, FixedRate, Format, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty,
SubscriptionParams, SubscriptionParamsRepr,
};
use pyth_lazer_protocol::subscription::{Request, Response, SubscribeRequest, SubscriptionId};
use tokio::pin;

fn get_lazer_access_token() -> String {
// Place your access token in your env at LAZER_ACCESS_TOKEN or set it here
Expand All @@ -22,7 +25,8 @@ async fn main() -> anyhow::Result<()> {
"wss://pyth-lazer.dourolabs.app/v1/stream",
&get_lazer_access_token(),
)?;
let mut stream = client.start().await?;
let stream = client.start().await?;
pin!(stream);

let subscription_requests = vec![
// Example subscription: Parsed JSON feed targeting Solana
Expand All @@ -36,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
PriceFeedProperty::BestAskPrice,
PriceFeedProperty::BestBidPrice,
],
chains: vec![Chain::Solana],
formats: vec![Format::Solana],
delivery_format: DeliveryFormat::Json,
json_binary_encoding: JsonBinaryEncoding::Base64,
parsed: true,
Expand All @@ -57,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
PriceFeedProperty::BestAskPrice,
PriceFeedProperty::BestBidPrice,
],
chains: vec![Chain::Evm, Chain::Solana],
formats: vec![Format::Evm, Format::Solana],
delivery_format: DeliveryFormat::Binary,
json_binary_encoding: JsonBinaryEncoding::Base64,
parsed: false,
Expand All @@ -80,49 +84,108 @@ async fn main() -> anyhow::Result<()> {
while let Some(msg) = stream.next().await {
// The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them.
match msg? {
Response::StreamUpdated(update) => {
if let Some(evm_data) = update.payload.evm {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&evm_data.data)?;
let evm_message = EvmMessage::deserialize_slice(&binary_data)?;

// Parse and verify the EVM message
let payload = parse_and_verify_evm_message(&evm_message);
println!("EVM payload: {payload:?}\n");
}
AnyResponse::Json(msg) => match msg {
Response::StreamUpdated(update) => {
println!("Received a JSON update for {:?}", update.subscription_id);
if let Some(evm_data) = update.payload.evm {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&evm_data.data)?;
let evm_message = EvmMessage::deserialize_slice(&binary_data)?;

// Parse and verify the EVM message
let payload = parse_and_verify_evm_message(&evm_message);
println!("EVM payload: {payload:?}");
}

if let Some(solana_data) = update.payload.solana {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&solana_data.data)?;
let solana_message = SolanaMessage::deserialize_slice(&binary_data)?;
if let Some(solana_data) = update.payload.solana {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&solana_data.data)?;
let solana_message = SolanaMessage::deserialize_slice(&binary_data)?;

// Parse and verify the Solana message
let payload = parse_and_verify_solana_message(&solana_message);
println!("Solana payload: {payload:?}\n");
}
// Parse and verify the Solana message
let payload = parse_and_verify_solana_message(&solana_message);
println!("Solana payload: {payload:?}");
}

if let Some(parsed) = update.payload.parsed {
// Parsed payloads (`parsed: true`) are already decoded and ready to use
for feed in parsed.price_feeds {
println!(
"Parsed payload: {:?}: {:?} at {:?}\n",
feed.price_feed_id, feed, parsed.timestamp_us
);
if let Some(data) = update.payload.le_ecdsa {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&data.data)?;
let message = LeEcdsaMessage::deserialize_slice(&binary_data)?;

// Parse and verify the message
let payload = parse_and_verify_le_ecdsa_message(&message);
println!("LeEcdsa payload: {payload:?}");
}

if let Some(data) = update.payload.le_unsigned {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&data.data)?;
let message = LeUnsignedMessage::deserialize_slice(&binary_data)?;

// Parse the message
let payload = PayloadData::deserialize_slice_le(&message.payload)?;
println!("LE unsigned payload: {payload:?}");
}

if let Some(parsed) = update.payload.parsed {
// Parsed payloads (`parsed: true`) are already decoded and ready to use
for feed in parsed.price_feeds {
println!(
"Parsed payload: {:?}: {:?} at {:?}",
feed.price_feed_id, feed, parsed.timestamp_us
);
}
}
}
msg => println!("Received non-update message: {msg:?}"),
},
AnyResponse::Binary(msg) => {
println!("Received a binary update for {:?}", msg.subscription_id);
for message in msg.messages {
match message {
Message::Evm(message) => {
// Parse and verify the EVM message
let payload = parse_and_verify_evm_message(&message);
println!("EVM payload: {payload:?}");
}
Message::Solana(message) => {
// Parse and verify the Solana message
let payload = parse_and_verify_solana_message(&message);
println!("Solana payload: {payload:?}");
}
Message::LeEcdsa(message) => {
let payload = parse_and_verify_le_ecdsa_message(&message);
println!("LeEcdsa payload: {payload:?}");
}
Message::LeUnsigned(message) => {
let payload = PayloadData::deserialize_slice_le(&message.payload)?;
println!("LeUnsigned payload: {payload:?}");
}
Message::Json(message) => {
for feed in message.price_feeds {
println!(
"JSON payload: {:?}: {:?} at {:?}",
feed.price_feed_id, feed, message.timestamp_us
);
}
}
}
}
}
_ => println!("Received non-update message"),
}
println!();

count += 1;
if count >= 50 {
break;
}
}

// Unsubscribe before exiting
// Unsubscribe example
for sub_id in [SubscriptionId(1), SubscriptionId(2)] {
client.unsubscribe(sub_id).await?;
println!("Unsubscribed from {:?}", sub_id);
Expand All @@ -147,12 +210,32 @@ fn parse_and_verify_solana_message(solana_message: &SolanaMessage) -> anyhow::Re

fn parse_and_verify_evm_message(evm_message: &EvmMessage) -> anyhow::Result<PayloadData> {
// Recover pubkey from message
libsecp256k1::recover(
let public_key = libsecp256k1::recover(
&libsecp256k1::Message::parse(&alloy_primitives::keccak256(&evm_message.payload)),
&libsecp256k1::Signature::parse_standard(&evm_message.signature)?,
&libsecp256k1::RecoveryId::parse(evm_message.recovery_id)?,
)?;
println!(
"evm address recovered from signature: {:?}",
hex::encode(&alloy_primitives::keccak256(&public_key.serialize()[1..])[12..])
);

let payload = PayloadData::deserialize_slice_be(&evm_message.payload)?;
Ok(payload)
}

fn parse_and_verify_le_ecdsa_message(message: &LeEcdsaMessage) -> anyhow::Result<PayloadData> {
// Recover pubkey from message
let public_key = libsecp256k1::recover(
&libsecp256k1::Message::parse(&alloy_primitives::keccak256(&message.payload)),
&libsecp256k1::Signature::parse_standard(&message.signature)?,
&libsecp256k1::RecoveryId::parse(message.recovery_id)?,
)?;
println!(
"evm address recovered from signature: {:?}",
hex::encode(&alloy_primitives::keccak256(&public_key.serialize()[1..])[12..])
);

let payload = PayloadData::deserialize_slice_le(&message.payload)?;
Ok(payload)
}
51 changes: 34 additions & 17 deletions lazer/sdk/rust/client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use pyth_lazer_protocol::subscription::{
ErrorResponse, Request, Response, SubscriptionId, UnsubscribeRequest,
use derive_more::From;
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use pyth_lazer_protocol::{
binary_update::BinaryWsUpdate,
subscription::{ErrorResponse, Request, Response, SubscriptionId, UnsubscribeRequest},
};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use url::Url;

/// A WebSocket client for consuming Pyth Lazer price feed updates
///
/// This client provides a simple interface to:
Expand All @@ -25,6 +28,12 @@ pub struct LazerClient {
>,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, From)]
pub enum AnyResponse {
Json(Response),
Binary(BinaryWsUpdate),
}

impl LazerClient {
/// Creates a new Lazer client instance
///
Expand All @@ -48,7 +57,7 @@ impl LazerClient {
///
/// # Returns
/// Returns a stream of responses from the server
pub async fn start(&mut self) -> Result<impl futures_util::Stream<Item = Result<Response>>> {
pub async fn start(&mut self) -> Result<impl futures_util::Stream<Item = Result<AnyResponse>>> {
let url = self.endpoint.clone();
let mut request =
tokio_tungstenite::tungstenite::client::IntoClientRequest::into_client_request(url)?;
Expand All @@ -62,19 +71,27 @@ impl LazerClient {
let (ws_sender, ws_receiver) = ws_stream.split();

self.ws_sender = Some(ws_sender);
let response_stream = ws_receiver.map(|msg| -> Result<Response> {
let msg = msg?;
match msg {
Message::Text(text) => Ok(serde_json::from_str(&text)?),
Message::Binary(data) => Ok(Response::from_binary(&data)?),
Message::Close(_) => Ok(Response::Error(ErrorResponse {
error: "WebSocket connection closed".to_string(),
})),
_ => Ok(Response::Error(ErrorResponse {
error: "Unexpected message type".to_string(),
})),
}
});
let response_stream =
ws_receiver
.map_err(anyhow::Error::from)
.try_filter_map(|msg| async {
let r: Result<Option<AnyResponse>> = match msg {
Message::Text(text) => {
Ok(Some(serde_json::from_str::<Response>(&text)?.into()))
}
Message::Binary(data) => {
Ok(Some(BinaryWsUpdate::deserialize_slice(&data)?.into()))
}
Message::Close(_) => Ok(Some(
Response::Error(ErrorResponse {
error: "WebSocket connection closed".to_string(),
})
.into(),
)),
_ => Ok(None),
};
r
});

Ok(response_stream)
}
Expand Down
13 changes: 8 additions & 5 deletions lazer/sdk/rust/protocol/examples/parse_and_verify.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use {
anyhow::bail,
byteorder::{ReadBytesExt, BE},
byteorder::{ReadBytesExt, LE},
pyth_lazer_protocol::{
message::{EvmMessage, SolanaMessage},
payload::{PayloadData, EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC_BE},
message::{
format_magics_le::{EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC},
EvmMessage, SolanaMessage,
},
payload::PayloadData,
},
std::io::{stdin, BufRead, Cursor},
};
Expand All @@ -12,8 +15,8 @@ fn main() -> anyhow::Result<()> {
println!("Reading hex encoded payloads from stdin...");
for line in stdin().lock().lines() {
let message = hex::decode(line?.trim())?;
let magic = Cursor::new(&message).read_u32::<BE>()?;
if magic == SOLANA_FORMAT_MAGIC_BE {
let magic = Cursor::new(&message).read_u32::<LE>()?;
if magic == SOLANA_FORMAT_MAGIC {
println!("this is a solana payload");
let message = SolanaMessage::deserialize_slice(&message)?;
println!(
Expand Down
6 changes: 4 additions & 2 deletions lazer/sdk/rust/protocol/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use serde::{Deserialize, Serialize};

use crate::router::{
Chain, Channel, JsonBinaryEncoding, JsonUpdate, PriceFeedId, PriceFeedProperty,
Channel, Format, JsonBinaryEncoding, JsonUpdate, PriceFeedId, PriceFeedProperty,
};

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LatestPriceRequest {
pub price_feed_ids: Vec<PriceFeedId>,
pub properties: Vec<PriceFeedProperty>,
pub chains: Vec<Chain>,
// "chains" was renamed to "formats". "chains" is still supported for compatibility.
#[serde(alias = "chains")]
pub formats: Vec<Format>,
#[serde(default)]
pub json_binary_encoding: JsonBinaryEncoding,
/// If `true`, the stream update will contain a JSON object containing
Expand Down
Loading
Loading