Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lazer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion lazer/sdk/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "A Rust client for Pyth Lazer"
license = "Apache-2.0"

[dependencies]
pyth-lazer-protocol = "0.5.0"
pyth-lazer-protocol = { path = "../protocol", version = "0.6.0" }
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
futures-util = "0.3"
Expand All @@ -16,6 +16,7 @@ base64 = "0.22.1"
anyhow = "1.0"
tracing = "0.1"
url = "2.4"
derive_more = { version = "1.0.0", features = ["from"] }

[dev-dependencies]
bincode = "1.3.3"
Expand Down
155 changes: 119 additions & 36 deletions lazer/sdk/rust/client/examples/subscribe_price_feeds.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use base64::Engine;
use futures_util::StreamExt;
use pyth_lazer_client::LazerClient;
use pyth_lazer_protocol::message::{EvmMessage, SolanaMessage};
use pyth_lazer_client::{AnyResponse, LazerClient};
use pyth_lazer_protocol::message::{
EvmMessage, LeEcdsaMessage, LeUnsignedMessage, Message, SolanaMessage,
};
use pyth_lazer_protocol::payload::PayloadData;
use pyth_lazer_protocol::router::{
Chain, Channel, DeliveryFormat, FixedRate, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty,
Channel, DeliveryFormat, FixedRate, Format, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty,
SubscriptionParams, SubscriptionParamsRepr,
};
use pyth_lazer_protocol::subscription::{Request, Response, SubscribeRequest, SubscriptionId};
use tokio::pin;

fn get_lazer_access_token() -> String {
// Place your access token in your env at LAZER_ACCESS_TOKEN or set it here
Expand All @@ -22,7 +25,8 @@ async fn main() -> anyhow::Result<()> {
"wss://pyth-lazer.dourolabs.app/v1/stream",
&get_lazer_access_token(),
)?;
let mut stream = client.start().await?;
let stream = client.start().await?;
pin!(stream);

let subscription_requests = vec![
// Example subscription: Parsed JSON feed targeting Solana
Expand All @@ -36,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
PriceFeedProperty::BestAskPrice,
PriceFeedProperty::BestBidPrice,
],
chains: vec![Chain::Solana],
formats: vec![Format::Solana],
delivery_format: DeliveryFormat::Json,
json_binary_encoding: JsonBinaryEncoding::Base64,
parsed: true,
Expand All @@ -57,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
PriceFeedProperty::BestAskPrice,
PriceFeedProperty::BestBidPrice,
],
chains: vec![Chain::Evm, Chain::Solana],
formats: vec![Format::Evm, Format::Solana],
delivery_format: DeliveryFormat::Binary,
json_binary_encoding: JsonBinaryEncoding::Base64,
parsed: false,
Expand All @@ -80,49 +84,108 @@ async fn main() -> anyhow::Result<()> {
while let Some(msg) = stream.next().await {
// The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them.
match msg? {
Response::StreamUpdated(update) => {
if let Some(evm_data) = update.payload.evm {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&evm_data.data)?;
let evm_message = EvmMessage::deserialize_slice(&binary_data)?;

// Parse and verify the EVM message
let payload = parse_and_verify_evm_message(&evm_message);
println!("EVM payload: {payload:?}\n");
}
AnyResponse::Json(msg) => match msg {
Response::StreamUpdated(update) => {
println!("Received a JSON update for {:?}", update.subscription_id);
if let Some(evm_data) = update.payload.evm {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&evm_data.data)?;
let evm_message = EvmMessage::deserialize_slice(&binary_data)?;

// Parse and verify the EVM message
let payload = parse_and_verify_evm_message(&evm_message);
println!("EVM payload: {payload:?}");
}

if let Some(solana_data) = update.payload.solana {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&solana_data.data)?;
let solana_message = SolanaMessage::deserialize_slice(&binary_data)?;
if let Some(solana_data) = update.payload.solana {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&solana_data.data)?;
let solana_message = SolanaMessage::deserialize_slice(&binary_data)?;

// Parse and verify the Solana message
let payload = parse_and_verify_solana_message(&solana_message);
println!("Solana payload: {payload:?}\n");
}
// Parse and verify the Solana message
let payload = parse_and_verify_solana_message(&solana_message);
println!("Solana payload: {payload:?}");
}

if let Some(parsed) = update.payload.parsed {
// Parsed payloads (`parsed: true`) are already decoded and ready to use
for feed in parsed.price_feeds {
println!(
"Parsed payload: {:?}: {:?} at {:?}\n",
feed.price_feed_id, feed, parsed.timestamp_us
);
if let Some(data) = update.payload.le_ecdsa {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&data.data)?;
let message = LeEcdsaMessage::deserialize_slice(&binary_data)?;

// Parse and verify the message
let payload = parse_and_verify_le_ecdsa_message(&message);
println!("LeEcdsa payload: {payload:?}");
}

if let Some(data) = update.payload.le_unsigned {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&data.data)?;
let message = LeUnsignedMessage::deserialize_slice(&binary_data)?;

// Parse the message
let payload = PayloadData::deserialize_slice_le(&message.payload)?;
println!("LE unsigned payload: {payload:?}");
}

if let Some(parsed) = update.payload.parsed {
// Parsed payloads (`parsed: true`) are already decoded and ready to use
for feed in parsed.price_feeds {
println!(
"Parsed payload: {:?}: {:?} at {:?}",
feed.price_feed_id, feed, parsed.timestamp_us
);
}
}
}
msg => println!("Received non-update message: {msg:?}"),
},
AnyResponse::Binary(msg) => {
println!("Received a binary update for {:?}", msg.subscription_id);
for message in msg.messages {
match message {
Message::Evm(message) => {
// Parse and verify the EVM message
let payload = parse_and_verify_evm_message(&message);
println!("EVM payload: {payload:?}");
}
Message::Solana(message) => {
// Parse and verify the Solana message
let payload = parse_and_verify_solana_message(&message);
println!("Solana payload: {payload:?}");
}
Message::LeEcdsa(message) => {
let payload = parse_and_verify_le_ecdsa_message(&message);
println!("LeEcdsa payload: {payload:?}");
}
Message::LeUnsigned(message) => {
let payload = PayloadData::deserialize_slice_le(&message.payload)?;
println!("LeUnsigned payload: {payload:?}");
}
Message::Json(message) => {
for feed in message.price_feeds {
println!(
"JSON payload: {:?}: {:?} at {:?}",
feed.price_feed_id, feed, message.timestamp_us
);
}
}
}
}
}
_ => println!("Received non-update message"),
}
println!();

count += 1;
if count >= 50 {
break;
}
}

// Unsubscribe before exiting
// Unsubscribe example
for sub_id in [SubscriptionId(1), SubscriptionId(2)] {
client.unsubscribe(sub_id).await?;
println!("Unsubscribed from {:?}", sub_id);
Expand All @@ -147,12 +210,32 @@ fn parse_and_verify_solana_message(solana_message: &SolanaMessage) -> anyhow::Re

fn parse_and_verify_evm_message(evm_message: &EvmMessage) -> anyhow::Result<PayloadData> {
// Recover pubkey from message
libsecp256k1::recover(
let public_key = libsecp256k1::recover(
&libsecp256k1::Message::parse(&alloy_primitives::keccak256(&evm_message.payload)),
&libsecp256k1::Signature::parse_standard(&evm_message.signature)?,
&libsecp256k1::RecoveryId::parse(evm_message.recovery_id)?,
)?;
println!(
"evm address recovered from signature: {:?}",
hex::encode(&alloy_primitives::keccak256(&public_key.serialize()[1..])[12..])
);

let payload = PayloadData::deserialize_slice_be(&evm_message.payload)?;
Ok(payload)
}

fn parse_and_verify_le_ecdsa_message(message: &LeEcdsaMessage) -> anyhow::Result<PayloadData> {
// Recover pubkey from message
let public_key = libsecp256k1::recover(
&libsecp256k1::Message::parse(&alloy_primitives::keccak256(&message.payload)),
&libsecp256k1::Signature::parse_standard(&message.signature)?,
&libsecp256k1::RecoveryId::parse(message.recovery_id)?,
)?;
println!(
"evm address recovered from signature: {:?}",
hex::encode(&alloy_primitives::keccak256(&public_key.serialize()[1..])[12..])
);

let payload = PayloadData::deserialize_slice_le(&message.payload)?;
Ok(payload)
}
51 changes: 34 additions & 17 deletions lazer/sdk/rust/client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use pyth_lazer_protocol::subscription::{
ErrorResponse, Request, Response, SubscriptionId, UnsubscribeRequest,
use derive_more::From;
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use pyth_lazer_protocol::{
binary_update::BinaryWsUpdate,
subscription::{ErrorResponse, Request, Response, SubscriptionId, UnsubscribeRequest},
};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use url::Url;

/// A WebSocket client for consuming Pyth Lazer price feed updates
///
/// This client provides a simple interface to:
Expand All @@ -25,6 +28,12 @@ pub struct LazerClient {
>,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, From)]
pub enum AnyResponse {
Json(Response),
Binary(BinaryWsUpdate),
}

impl LazerClient {
/// Creates a new Lazer client instance
///
Expand All @@ -48,7 +57,7 @@ impl LazerClient {
///
/// # Returns
/// Returns a stream of responses from the server
pub async fn start(&mut self) -> Result<impl futures_util::Stream<Item = Result<Response>>> {
pub async fn start(&mut self) -> Result<impl futures_util::Stream<Item = Result<AnyResponse>>> {
let url = self.endpoint.clone();
let mut request =
tokio_tungstenite::tungstenite::client::IntoClientRequest::into_client_request(url)?;
Expand All @@ -62,19 +71,27 @@ impl LazerClient {
let (ws_sender, ws_receiver) = ws_stream.split();

self.ws_sender = Some(ws_sender);
let response_stream = ws_receiver.map(|msg| -> Result<Response> {
let msg = msg?;
match msg {
Message::Text(text) => Ok(serde_json::from_str(&text)?),
Message::Binary(data) => Ok(Response::from_binary(&data)?),
Message::Close(_) => Ok(Response::Error(ErrorResponse {
error: "WebSocket connection closed".to_string(),
})),
_ => Ok(Response::Error(ErrorResponse {
error: "Unexpected message type".to_string(),
})),
}
});
let response_stream =
ws_receiver
.map_err(anyhow::Error::from)
.try_filter_map(|msg| async {
let r: Result<Option<AnyResponse>> = match msg {
Message::Text(text) => {
Ok(Some(serde_json::from_str::<Response>(&text)?.into()))
}
Message::Binary(data) => {
Ok(Some(BinaryWsUpdate::deserialize_slice(&data)?.into()))
}
Message::Close(_) => Ok(Some(
Response::Error(ErrorResponse {
error: "WebSocket connection closed".to_string(),
})
.into(),
)),
_ => Ok(None),
};
r
});

Ok(response_stream)
}
Expand Down
2 changes: 1 addition & 1 deletion lazer/sdk/rust/protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-lazer-protocol"
version = "0.5.1"
version = "0.6.0"
edition = "2021"
description = "Pyth Lazer SDK - protocol types."
license = "Apache-2.0"
Expand Down
13 changes: 8 additions & 5 deletions lazer/sdk/rust/protocol/examples/parse_and_verify.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use {
anyhow::bail,
byteorder::{ReadBytesExt, BE},
byteorder::{ReadBytesExt, LE},
pyth_lazer_protocol::{
message::{EvmMessage, SolanaMessage},
payload::{PayloadData, EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC_BE},
message::{
format_magics_le::{EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC},
EvmMessage, SolanaMessage,
},
payload::PayloadData,
},
std::io::{stdin, BufRead, Cursor},
};
Expand All @@ -12,8 +15,8 @@ fn main() -> anyhow::Result<()> {
println!("Reading hex encoded payloads from stdin...");
for line in stdin().lock().lines() {
let message = hex::decode(line?.trim())?;
let magic = Cursor::new(&message).read_u32::<BE>()?;
if magic == SOLANA_FORMAT_MAGIC_BE {
let magic = Cursor::new(&message).read_u32::<LE>()?;
if magic == SOLANA_FORMAT_MAGIC {
println!("this is a solana payload");
let message = SolanaMessage::deserialize_slice(&message)?;
println!(
Expand Down
6 changes: 4 additions & 2 deletions lazer/sdk/rust/protocol/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use serde::{Deserialize, Serialize};

use crate::router::{
Chain, Channel, JsonBinaryEncoding, JsonUpdate, PriceFeedId, PriceFeedProperty,
Channel, Format, JsonBinaryEncoding, JsonUpdate, PriceFeedId, PriceFeedProperty,
};

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LatestPriceRequest {
pub price_feed_ids: Vec<PriceFeedId>,
pub properties: Vec<PriceFeedProperty>,
pub chains: Vec<Chain>,
// "chains" was renamed to "formats". "chains" is still supported for compatibility.
#[serde(alias = "chains")]
pub formats: Vec<Format>,
#[serde(default)]
pub json_binary_encoding: JsonBinaryEncoding,
/// If `true`, the stream update will contain a JSON object containing
Expand Down
Loading
Loading