1+ use base64:: Engine ;
12use futures_util:: StreamExt ;
23use pyth_lazer_client:: LazerClient ;
4+ use pyth_lazer_protocol:: message:: { EvmMessage , SolanaMessage } ;
5+ use pyth_lazer_protocol:: payload:: PayloadData ;
36use pyth_lazer_protocol:: router:: {
47 Chain , Channel , DeliveryFormat , FixedRate , JsonBinaryEncoding , PriceFeedId , PriceFeedProperty ,
58 SubscriptionParams , SubscriptionParamsRepr ,
69} ;
7- use pyth_lazer_protocol:: subscription:: { Request , SubscribeRequest , SubscriptionId } ;
10+ use pyth_lazer_protocol:: subscription:: { Request , Response , SubscribeRequest , SubscriptionId } ;
811
912fn get_lazer_access_token ( ) -> String {
1013 // Place your access token in your env at LAZER_ACCESS_TOKEN or set it here
@@ -75,7 +78,44 @@ async fn main() -> anyhow::Result<()> {
7578 // Process the first few updates
7679 let mut count = 0 ;
7780 while let Some ( msg) = stream. next ( ) . await {
78- println ! ( "Received update: {:?}" , msg?) ;
81+ // The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them.
82+ match msg? {
83+ Response :: StreamUpdated ( update) => {
84+ if let Some ( evm_data) = update. payload . evm {
85+ // Decode binary data
86+ let binary_data =
87+ base64:: engine:: general_purpose:: STANDARD . decode ( & evm_data. data ) ?;
88+ let evm_message = EvmMessage :: deserialize_slice ( & binary_data) ?;
89+
90+ // Parse and verify the EVM message
91+ let payload = parse_and_verify_evm_message ( & evm_message) ;
92+ println ! ( "EVM payload: {payload:?}\n " ) ;
93+ }
94+
95+ if let Some ( solana_data) = update. payload . solana {
96+ // Decode binary data
97+ let binary_data =
98+ base64:: engine:: general_purpose:: STANDARD . decode ( & solana_data. data ) ?;
99+ let solana_message = SolanaMessage :: deserialize_slice ( & binary_data) ?;
100+
101+ // Parse and verify the Solana message
102+ let payload = parse_and_verify_solana_message ( & solana_message) ;
103+ println ! ( "Solana payload: {payload:?}\n " ) ;
104+ }
105+
106+ if let Some ( parsed) = update. payload . parsed {
107+ // Parsed payloads (`parsed: true`) are already decoded and ready to use
108+ for feed in parsed. price_feeds {
109+ println ! (
110+ "Parsed payload: {:?}: {:?} at {:?}\n " ,
111+ feed. price_feed_id, feed, parsed. timestamp_us
112+ ) ;
113+ }
114+ }
115+ }
116+ _ => println ! ( "Received non-update message" ) ,
117+ }
118+
79119 count += 1 ;
80120 if count >= 50 {
81121 break ;
@@ -92,3 +132,27 @@ async fn main() -> anyhow::Result<()> {
92132 client. close ( ) . await ?;
93133 Ok ( ( ) )
94134}
135+
136+ fn parse_and_verify_solana_message ( solana_message : & SolanaMessage ) -> anyhow:: Result < PayloadData > {
137+ // Verify signature using the pubkey
138+ let public_key = ed25519_dalek:: VerifyingKey :: from_bytes ( & solana_message. public_key ) ?;
139+ public_key. verify_strict (
140+ & solana_message. payload ,
141+ & ed25519_dalek:: Signature :: from_bytes ( & solana_message. signature ) ,
142+ ) ?;
143+
144+ let payload = PayloadData :: deserialize_slice_le ( & solana_message. payload ) ?;
145+ Ok ( payload)
146+ }
147+
148+ fn parse_and_verify_evm_message ( evm_message : & EvmMessage ) -> anyhow:: Result < PayloadData > {
149+ // Recover pubkey from message
150+ libsecp256k1:: recover (
151+ & libsecp256k1:: Message :: parse ( & alloy_primitives:: keccak256 ( & evm_message. payload ) ) ,
152+ & libsecp256k1:: Signature :: parse_standard ( & evm_message. signature ) ?,
153+ & libsecp256k1:: RecoveryId :: parse ( evm_message. recovery_id ) ?,
154+ ) ?;
155+
156+ let payload = PayloadData :: deserialize_slice_be ( & evm_message. payload ) ?;
157+ Ok ( payload)
158+ }
0 commit comments