|
| 1 | +use anyhow::Result; |
| 2 | +use futures_util::{SinkExt, StreamExt}; |
| 3 | +use pyth_lazer_protocol::{ |
| 4 | + message::{EvmMessage, SolanaMessage}, |
| 5 | + payload::{BINARY_UPDATE_FORMAT_MAGIC, EVM_FORMAT_MAGIC, PARSED_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC_BE}, |
| 6 | + router::{JsonBinaryData, JsonBinaryEncoding, JsonUpdate}, |
| 7 | + subscription::{Request, Response, SubscriptionId, StreamUpdatedResponse, ErrorResponse, UnsubscribeRequest}, |
| 8 | +}; |
| 9 | +use tokio_tungstenite::{connect_async, tungstenite::Message}; |
| 10 | +use url::Url; |
| 11 | + |
| 12 | +/// Response type for binary messages containing chain-specific data |
| 13 | +#[derive(Debug)] |
| 14 | +pub enum BinaryResponse { |
| 15 | + /// EVM chain message with payload and signature |
| 16 | + Evm(EvmMessage), |
| 17 | + /// Solana chain message with payload and signature |
| 18 | + Solana(SolanaMessage), |
| 19 | + /// Parsed JSON payload for human-readable format |
| 20 | + Parsed(serde_json::Value), |
| 21 | +} |
| 22 | + |
| 23 | +/// A WebSocket client for consuming Pyth Lazer price feed updates |
| 24 | +/// |
| 25 | +/// This client provides a simple interface to: |
| 26 | +/// - Connect to a Lazer WebSocket endpoint |
| 27 | +/// - Subscribe to price feed updates |
| 28 | +/// - Receive updates as a stream of messages |
| 29 | +/// |
| 30 | +/// # Example |
| 31 | +/// ```no_run |
| 32 | +/// use pyth_lazer_consumer::LazerConsumerClient; |
| 33 | +/// use protocol::subscription::{Request, SubscribeRequest, SubscriptionParams}; |
| 34 | +/// |
| 35 | +/// #[tokio::main] |
| 36 | +/// async fn main() -> anyhow::Result<()> { |
| 37 | +/// let (mut client, mut stream) = LazerConsumerClient::start("wss://endpoint").await?; |
| 38 | +/// |
| 39 | +/// // Subscribe to price feeds |
| 40 | +/// client.subscribe(Request::Subscribe(SubscribeRequest { |
| 41 | +/// subscription_id: SubscriptionId(1), |
| 42 | +/// params: SubscriptionParams { /* ... */ }, |
| 43 | +/// })).await?; |
| 44 | +/// |
| 45 | +/// // Process updates |
| 46 | +/// while let Some(msg) = stream.next().await { |
| 47 | +/// println!("Received: {:?}", msg?); |
| 48 | +/// } |
| 49 | +/// Ok(()) |
| 50 | +/// } |
| 51 | +/// ``` |
| 52 | +pub struct LazerConsumerClient { |
| 53 | + ws_sender: futures_util::stream::SplitSink< |
| 54 | + tokio_tungstenite::WebSocketStream< |
| 55 | + tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream> |
| 56 | + >, |
| 57 | + Message, |
| 58 | + >, |
| 59 | +} |
| 60 | + |
| 61 | +impl LazerConsumerClient { |
| 62 | + /// Starts a new WebSocket connection to the Lazer endpoint |
| 63 | + /// |
| 64 | + /// # Arguments |
| 65 | + /// * `url` - The WebSocket URL of the Lazer service |
| 66 | + /// |
| 67 | + /// # Returns |
| 68 | + /// Returns a tuple containing: |
| 69 | + /// - The client instance for sending requests |
| 70 | + /// - A stream of responses from the server |
| 71 | + pub async fn start(url: &str) -> Result<(Self, impl futures_util::Stream<Item = Result<Response>>)> { |
| 72 | + let url = Url::parse(url)?; |
| 73 | + let (ws_stream, _) = connect_async(url).await?; |
| 74 | + let (ws_sender, ws_receiver) = ws_stream.split(); |
| 75 | + |
| 76 | + let client = Self { ws_sender }; |
| 77 | + let response_stream = ws_receiver.map(|msg| -> Result<Response> { |
| 78 | + let msg = msg?; |
| 79 | + match msg { |
| 80 | + Message::Text(text) => Ok(serde_json::from_str(&text)?), |
| 81 | + Message::Binary(data) => { |
| 82 | + let mut pos = 0; |
| 83 | + let magic = u32::from_be_bytes(data[pos..pos + 4].try_into()?); |
| 84 | + pos += 4; |
| 85 | + |
| 86 | + if magic != BINARY_UPDATE_FORMAT_MAGIC { |
| 87 | + anyhow::bail!("binary update format magic mismatch"); |
| 88 | + } |
| 89 | + |
| 90 | + let subscription_id = SubscriptionId(u64::from_be_bytes(data[pos..pos + 8].try_into()?)); |
| 91 | + pos += 8; |
| 92 | + |
| 93 | + let mut evm = None; |
| 94 | + let mut solana = None; |
| 95 | + let mut parsed = None; |
| 96 | + |
| 97 | + while pos < data.len() { |
| 98 | + let len = u16::from_be_bytes(data[pos..pos + 2].try_into()?) as usize; |
| 99 | + pos += 2; |
| 100 | + let magic = u32::from_be_bytes(data[pos..pos + 4].try_into()?); |
| 101 | + |
| 102 | + match magic { |
| 103 | + EVM_FORMAT_MAGIC => { |
| 104 | + evm = Some(EvmMessage::deserialize_slice(&data[pos..pos + len])?); |
| 105 | + } |
| 106 | + SOLANA_FORMAT_MAGIC_BE => { |
| 107 | + solana = Some(SolanaMessage::deserialize_slice(&data[pos..pos + len])?); |
| 108 | + } |
| 109 | + PARSED_FORMAT_MAGIC => { |
| 110 | + parsed = Some(serde_json::from_slice(&data[pos + 4..pos + len])?); |
| 111 | + } |
| 112 | + _ => anyhow::bail!("unknown magic: {}", magic), |
| 113 | + } |
| 114 | + pos += len; |
| 115 | + } |
| 116 | + |
| 117 | + Ok(Response::StreamUpdated(StreamUpdatedResponse { |
| 118 | + subscription_id, |
| 119 | + payload: JsonUpdate { |
| 120 | + evm: evm.map(|m| JsonBinaryData { |
| 121 | + encoding: JsonBinaryEncoding::Base64, |
| 122 | + data: base64::encode(&m.payload), |
| 123 | + }), |
| 124 | + solana: solana.map(|m| JsonBinaryData { |
| 125 | + encoding: JsonBinaryEncoding::Base64, |
| 126 | + data: base64::encode(&m.payload), |
| 127 | + }), |
| 128 | + parsed, |
| 129 | + }, |
| 130 | + })) |
| 131 | + } |
| 132 | + Message::Close(_) => Ok(Response::Error(ErrorResponse { |
| 133 | + error: "WebSocket connection closed".to_string(), |
| 134 | + })), |
| 135 | + _ => Ok(Response::Error(ErrorResponse { |
| 136 | + error: "Unexpected message type".to_string(), |
| 137 | + })), |
| 138 | + } |
| 139 | + }); |
| 140 | + |
| 141 | + Ok((client, response_stream)) |
| 142 | + } |
| 143 | + |
| 144 | + /// Subscribes to price feed updates |
| 145 | + /// |
| 146 | + /// # Arguments |
| 147 | + /// * `request` - A subscription request containing feed IDs and parameters |
| 148 | + pub async fn subscribe(&mut self, request: Request) -> Result<()> { |
| 149 | + let msg = serde_json::to_string(&request)?; |
| 150 | + self.ws_sender.send(Message::Text(msg)).await?; |
| 151 | + Ok(()) |
| 152 | + } |
| 153 | + |
| 154 | + /// Unsubscribes from a previously subscribed feed |
| 155 | + /// |
| 156 | + /// # Arguments |
| 157 | + /// * `subscription_id` - The ID of the subscription to cancel |
| 158 | + pub async fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<()> { |
| 159 | + let request = Request::Unsubscribe(UnsubscribeRequest { |
| 160 | + subscription_id, |
| 161 | + }); |
| 162 | + let msg = serde_json::to_string(&request)?; |
| 163 | + self.ws_sender.send(Message::Text(msg)).await?; |
| 164 | + Ok(()) |
| 165 | + } |
| 166 | +} |
0 commit comments