diff --git a/Cargo.lock b/Cargo.lock index 3c05101956..258bceb114 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5023,15 +5023,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" -[[package]] -name = "ordered-float" -version = "2.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" -dependencies = [ - "num-traits", -] - [[package]] name = "ordered-multimap" version = "0.7.3" @@ -5617,7 +5608,7 @@ dependencies = [ [[package]] name = "pyth-lazer-agent" -version = "0.3.3" +version = "0.4.0" dependencies = [ "anyhow", "backoff", @@ -5635,8 +5626,8 @@ dependencies = [ "hyper 1.6.0", "hyper-util", "protobuf", - "pyth-lazer-protocol 0.8.1", - "pyth-lazer-publisher-sdk 0.1.7", + "pyth-lazer-protocol 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", + "pyth-lazer-publisher-sdk 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.12.22", "serde", "serde_json", @@ -5679,15 +5670,22 @@ dependencies = [ [[package]] name = "pyth-lazer-protocol" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1258b8770756a82a39b7b02a296c10a91b93aa58c0cded47950defe4d9377644" +version = "0.10.1" dependencies = [ + "alloy-primitives 0.8.25", "anyhow", + "bincode 1.3.3", + "bs58", "byteorder", + "chrono", "derive_more 1.0.0", + "ed25519-dalek 2.1.1", + "hex", + "humantime", "humantime-serde", "itertools 0.13.0", + "libsecp256k1 0.7.2", + "mry", "protobuf", "rust_decimal", "serde", @@ -5697,21 +5695,17 @@ dependencies = [ [[package]] name = "pyth-lazer-protocol" version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d321e49be0315d68f07d097d240701a05e003e05eff5ac9f2d0457d4a606dd92" dependencies = [ - "alloy-primitives 0.8.25", "anyhow", - "bincode 1.3.3", - "bs58", "byteorder", "chrono", "derive_more 1.0.0", - "ed25519-dalek 2.1.1", "hex", "humantime", "humantime-serde", "itertools 0.13.0", - "libsecp256k1 0.7.2", - "mry", "protobuf", "rust_decimal", "serde", @@ -5720,29 +5714,27 @@ dependencies = [ [[package]] name = "pyth-lazer-publisher-sdk" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8d52a515b21b77a89266d584da4363fcd1e121213ac3065ab7ff0dab1172006" +version = "0.3.0" dependencies = [ "anyhow", "fs-err", - "humantime", "protobuf", "protobuf-codegen", - "pyth-lazer-protocol 0.8.1", - "serde-value", - "tracing", + "pyth-lazer-protocol 0.10.1", + "serde_json", ] [[package]] name = "pyth-lazer-publisher-sdk" version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bebeacbc58d9e0143e03a397b08becbed1dacf5baad6a245bc00f74ca5cc50d" dependencies = [ "anyhow", "fs-err", "protobuf", "protobuf-codegen", - "pyth-lazer-protocol 0.10.1", + "pyth-lazer-protocol 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json", ] @@ -7020,16 +7012,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde-value" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" -dependencies = [ - "ordered-float", - "serde", -] - [[package]] name = "serde_bytes" version = "0.11.17" diff --git a/apps/pyth-lazer-agent/Cargo.toml b/apps/pyth-lazer-agent/Cargo.toml index 7ef3c99512..1dcae42e12 100644 --- a/apps/pyth-lazer-agent/Cargo.toml +++ b/apps/pyth-lazer-agent/Cargo.toml @@ -1,14 +1,14 @@ [package] name = "pyth-lazer-agent" -version = "0.3.3" +version = "0.4.0" edition = "2024" description = "Pyth Lazer Agent" license = "Apache-2.0" repository = "https://github.com/pyth-network/pyth-crosschain" [dependencies] -pyth-lazer-publisher-sdk = "0.1.7" -pyth-lazer-protocol = "0.8.1" +pyth-lazer-publisher-sdk = "0.3.0" +pyth-lazer-protocol = "0.10.1" anyhow = "1.0.98" backoff = "0.4.0" diff --git a/apps/pyth-lazer-agent/README.md b/apps/pyth-lazer-agent/README.md index f67387a7d2..cd6455e4c7 100644 --- a/apps/pyth-lazer-agent/README.md +++ b/apps/pyth-lazer-agent/README.md @@ -1,7 +1,7 @@ # pyth-lazer-agent pyth-lazer-agent is intended to be run by Lazer publishers analogous to [pyth-agent](https://github.com/pyth-network/pyth-agent) -for pythnet publishers. Currently it retains [the existing Lazer publishing interface](https://github.com/pyth-network/pyth-examples/tree/main/lazer/publisher), +for pythnet publishers. Currently, it retains [the existing Lazer publishing interface](https://github.com/pyth-network/pyth-examples/tree/main/lazer/publisher), but will batch and sign transactions before publishing them to Lazer. ## Keypair @@ -16,6 +16,18 @@ pyth-lazer-agent will need to configure access to this keypair file to sign tran ## Build and run +### From cargo +``` +# Download the cargo package +cargo install pyth-lazer-agent + +# Add .cargo/bin to PATH +export PATH="$PATH:~/.cargo/bin" + +# Run the agent +pyth-lazer-agent --help +``` + ### From source Please check [rust-toolchain](rust-toolchain.toml) to see the version of Rust needed to build (currently 1.88). You will also need SSL and CA certificates. `cargo build` should then work as usual. @@ -32,7 +44,7 @@ The agent takes a single `--config` CLI option, pointing at `config/config.toml` by default. Configuration is currently minimal: ```toml -relayer_urls = ["ws://relayer-0.pyth-lazer.dourolabs.app/v1/transaction", "ws://relayer-0.pyth-lazer.dourolabs.app/v1/transaction"] +relayer_urls = ["wss://relayer.pyth-lazer-staging.dourolabs.app/v1/transaction", "wss://relayer-1.pyth-lazer-staging.dourolabs.app/v1/transaction"] publish_keypair_path = "/path/to/keypair.json" authorization_token = "your_token" listen_address = "0.0.0.0:8910" @@ -44,8 +56,3 @@ publish_interval_duration = "25ms" - `authorization_token`: The Lazer team will provide this or instruct that it can be omitted. - `listen_address`: The local port the agent will be listening on; can be anything you want. - `publisher_interval`: The agent will batch and send transaction bundles at this interval. The Lazer team will provide guidance here. - -## Publish - -Please use the `/v1/publisher` or `/v2/publisher` endpoints and the corresponding `PriceFeedDataV1` and `PriceFeedDataV2` -schemas as defined in [the sdk](https://github.com/pyth-network/pyth-crosschain/blob/main/lazer/sdk/rust/protocol/src/publisher.rs). diff --git a/apps/pyth-lazer-agent/config/config.toml b/apps/pyth-lazer-agent/config/config.toml index 263904ccb5..4bf417c25a 100644 --- a/apps/pyth-lazer-agent/config/config.toml +++ b/apps/pyth-lazer-agent/config/config.toml @@ -1,4 +1,4 @@ -relayer_urls = ["wss://relayer-0.pyth-lazer.dourolabs.app/v1/transaction", "wss://relayer-0.pyth-lazer.dourolabs.app/v1/transaction"] +relayer_urls = ["wss://relayer.pyth-lazer-staging.dourolabs.app/v1/transaction", "wss://relayer-1.pyth-lazer-staging.dourolabs.app/v1/transaction"] publish_keypair_path = "/path/to/solana/id.json" listen_address = "0.0.0.0:8910" publish_interval_duration = "25ms" diff --git a/apps/pyth-lazer-agent/src/jrpc_handle.rs b/apps/pyth-lazer-agent/src/jrpc_handle.rs index b0c752910c..dea9d337a0 100644 --- a/apps/pyth-lazer-agent/src/jrpc_handle.rs +++ b/apps/pyth-lazer-agent/src/jrpc_handle.rs @@ -1,13 +1,13 @@ use crate::config::Config; use crate::lazer_publisher::LazerPublisher; -use crate::websocket_utils::{handle_websocket_error, send_text}; +use crate::websocket_utils::{handle_websocket_error, send_json, send_text}; use anyhow::Error; use futures::{AsyncRead, AsyncWrite}; use futures_util::io::{BufReader, BufWriter}; use hyper_util::rt::TokioIo; use pyth_lazer_protocol::jrpc::{ - GetMetadataParams, JrpcCall, JrpcError, JrpcErrorResponse, JrpcResponse, JrpcSuccessResponse, - JsonRpcVersion, PythLazerAgentJrpcV1, SymbolMetadata, + FeedUpdateParams, GetMetadataParams, JrpcCall, JrpcError, JrpcErrorResponse, JrpcResponse, + JrpcSuccessResponse, JsonRpcVersion, PythLazerAgentJrpcV1, SymbolMetadata, }; use soketto::Sender; use soketto::handshake::http::Server; @@ -82,7 +82,7 @@ async fn try_handle_jrpc( serde_json::to_string::>(&JrpcResponse::Error( JrpcErrorResponse { jsonrpc: JsonRpcVersion::V2, - error: JrpcError::InternalError.into(), + error: JrpcError::InternalError(err.to_string()).into(), id: None, }, ))? @@ -103,93 +103,40 @@ async fn handle_jrpc_inner( match serde_json::from_slice::(receive_buf.as_slice()) { Ok(jrpc_request) => match jrpc_request.params { JrpcCall::PushUpdate(request_params) => { - match lazer_publisher - .push_feed_update(request_params.into()) - .await - { - Ok(_) => { - send_text( - sender, - serde_json::to_string::>(&JrpcResponse::Success( - JrpcSuccessResponse:: { - jsonrpc: JsonRpcVersion::V2, - result: "success".to_string(), - id: jrpc_request.id, - }, - ))? - .as_str(), - ) - .await?; - } - Err(err) => { - debug!("error while sending updates: {:?}", err); - send_text( - sender, - serde_json::to_string::>(&JrpcResponse::Error( - JrpcErrorResponse { - jsonrpc: JsonRpcVersion::V2, - error: JrpcError::InternalError.into(), - id: Some(jrpc_request.id), - }, - ))? - .as_str(), - ) - .await?; - } - } + handle_push_update(sender, lazer_publisher, request_params, jrpc_request.id).await } - JrpcCall::GetMetadata(request_params) => match get_metadata(config.clone()).await { - Ok(symbols) => { - let symbols = filter_symbols(symbols.clone(), request_params); - - send_text( - sender, - serde_json::to_string::>>( - &JrpcResponse::Success(JrpcSuccessResponse::> { - jsonrpc: JsonRpcVersion::V2, - result: symbols, - id: jrpc_request.id, - }), - )? - .as_str(), - ) - .await?; - } - Err(err) => { - error!("error while retrieving metadata: {:?}", err); - send_text( + JrpcCall::GetMetadata(request_params) => { + if let Some(request_id) = jrpc_request.id { + handle_get_metadata(sender, config, request_params, request_id).await + } else { + send_json( sender, - serde_json::to_string::>(&JrpcResponse::Error( - JrpcErrorResponse { - jsonrpc: JsonRpcVersion::V2, - // note: right now specifying an invalid method results in a parse error - error: JrpcError::InternalError.into(), - id: None, - }, - ))? - .as_str(), + &JrpcErrorResponse { + jsonrpc: JsonRpcVersion::V2, + error: JrpcError::ParseError( + "The request to method 'get_metadata' requires an 'id'".to_string(), + ) + .into(), + id: None, + }, ) - .await?; + .await } - }, + } }, Err(err) => { debug!("Error parsing JRPC request: {}", err); - send_text( + send_json( sender, - serde_json::to_string::>(&JrpcResponse::Error( - JrpcErrorResponse { - jsonrpc: JsonRpcVersion::V2, - error: JrpcError::ParseError(err.to_string()).into(), - id: None, - }, - ))? - .as_str(), + &JrpcErrorResponse { + jsonrpc: JsonRpcVersion::V2, + error: JrpcError::ParseError(err.to_string()).into(), + id: None, + }, ) - .await?; + .await } } - Ok(()) } async fn get_metadata(config: Config) -> Result, Error> { @@ -242,6 +189,81 @@ fn filter_symbols( res } +async fn handle_push_update( + sender: &mut Sender, + lazer_publisher: &LazerPublisher, + request_params: FeedUpdateParams, + request_id: Option, +) -> anyhow::Result<()> { + match lazer_publisher + .push_feed_update(request_params.clone().into()) + .await + { + Ok(_) => { + if let Some(request_id) = request_id { + send_json( + sender, + &JrpcSuccessResponse:: { + jsonrpc: JsonRpcVersion::V2, + result: "success".to_string(), + id: request_id, + }, + ) + .await + } else { + Ok(()) + } + } + Err(err) => { + debug!("error while sending updates: {:?}", err); + send_json( + sender, + &JrpcErrorResponse { + jsonrpc: JsonRpcVersion::V2, + error: JrpcError::SendUpdateError(request_params).into(), + id: request_id, + }, + ) + .await + } + } +} + +async fn handle_get_metadata( + sender: &mut Sender, + config: &Config, + request_params: GetMetadataParams, + request_id: i64, +) -> anyhow::Result<()> { + match get_metadata(config.clone()).await { + Ok(symbols) => { + let symbols = filter_symbols(symbols.clone(), request_params); + + send_json( + sender, + &JrpcSuccessResponse::> { + jsonrpc: JsonRpcVersion::V2, + result: symbols, + id: request_id, + }, + ) + .await + } + Err(err) => { + error!("error while retrieving metadata: {:?}", err); + send_json( + sender, + &JrpcErrorResponse { + jsonrpc: JsonRpcVersion::V2, + error: JrpcError::InternalError(err.to_string()).into(), + id: Some(request_id), + }, + ) + .await + } + } +} + #[cfg(test)] pub mod tests { use super::*; diff --git a/apps/pyth-lazer-agent/src/publisher_handle.rs b/apps/pyth-lazer-agent/src/publisher_handle.rs index a75ddc6d87..a139e1d22d 100644 --- a/apps/pyth-lazer-agent/src/publisher_handle.rs +++ b/apps/pyth-lazer-agent/src/publisher_handle.rs @@ -4,7 +4,6 @@ use anyhow::bail; use futures_util::io::{BufReader, BufWriter}; use hyper_util::rt::TokioIo; use protobuf::MessageField; -use protobuf::well_known_types::timestamp::Timestamp; use pyth_lazer_protocol::publisher::{ PriceFeedDataV1, PriceFeedDataV2, ServerResponse, UpdateDeserializationErrorResponse, }; @@ -86,32 +85,17 @@ async fn try_handle_publisher( &receive_buf, bincode::config::legacy(), ) { - Ok((data, _)) => { - let source_timestamp = MessageField::some(Timestamp { - #[allow( - clippy::cast_possible_wrap, - reason = "Unix seconds won't wrap any time soon" - )] - seconds: (data.source_timestamp_us.0 / 1_000_000) as i64, - #[allow( - clippy::cast_possible_truncation, - reason = "this value will always be less than one billion" - )] - nanos: (data.source_timestamp_us.0 % 1_000_000 * 1000) as i32, - special_fields: Default::default(), - }); - FeedUpdate { - feed_id: Some(data.price_feed_id.0), - source_timestamp, - update: Some(Update::PriceUpdate(PriceUpdate { - price: data.price.map(|p| p.0.get()), - best_bid_price: data.best_bid_price.map(|p| p.0.get()), - best_ask_price: data.best_ask_price.map(|p| p.0.get()), - ..PriceUpdate::default() - })), - special_fields: Default::default(), - } - } + Ok((data, _)) => FeedUpdate { + feed_id: Some(data.price_feed_id.0), + source_timestamp: MessageField::some(data.source_timestamp_us.into()), + update: Some(Update::PriceUpdate(PriceUpdate { + price: data.price.map(|p| p.0.get()), + best_bid_price: data.best_bid_price.map(|p| p.0.get()), + best_ask_price: data.best_ask_price.map(|p| p.0.get()), + ..PriceUpdate::default() + })), + special_fields: Default::default(), + }, Err(err) => { error_count += 1; if error_count <= MAX_ERROR_LOG { @@ -137,31 +121,16 @@ async fn try_handle_publisher( &receive_buf, bincode::config::legacy(), ) { - Ok((data, _)) => { - let source_timestamp = MessageField::some(Timestamp { - #[allow( - clippy::cast_possible_wrap, - reason = "Unix seconds won't wrap any time soon" - )] - seconds: (data.source_timestamp_us.0 / 1_000_000) as i64, - #[allow( - clippy::cast_possible_truncation, - reason = "this value will always be less than one billion" - )] - nanos: (data.source_timestamp_us.0 % 1_000_000 * 1000) as i32, - special_fields: Default::default(), - }); - FeedUpdate { - feed_id: Some(data.price_feed_id.0), - source_timestamp, - update: Some(Update::FundingRateUpdate(FundingRateUpdate { - price: data.price.map(|p| p.0.get()), - rate: data.funding_rate.map(|r| r.0), - ..FundingRateUpdate::default() - })), - special_fields: Default::default(), - } - } + Ok((data, _)) => FeedUpdate { + feed_id: Some(data.price_feed_id.0), + source_timestamp: MessageField::some(data.source_timestamp_us.into()), + update: Some(Update::FundingRateUpdate(FundingRateUpdate { + price: data.price.map(|p| p.0.get()), + rate: data.funding_rate.map(|r| r.0), + ..FundingRateUpdate::default() + })), + special_fields: Default::default(), + }, Err(err) => { error_count += 1; if error_count <= MAX_ERROR_LOG { diff --git a/apps/pyth-lazer-agent/src/websocket_utils.rs b/apps/pyth-lazer-agent/src/websocket_utils.rs index 040d1c602b..53b2d763da 100644 --- a/apps/pyth-lazer-agent/src/websocket_utils.rs +++ b/apps/pyth-lazer-agent/src/websocket_utils.rs @@ -41,3 +41,10 @@ pub async fn send_text( }) .await? } + +pub async fn send_json( + sender: &mut Sender, + value: &U, +) -> anyhow::Result<()> { + send_text(sender, &serde_json::to_string(value)?).await +}