diff --git a/Cargo.lock b/Cargo.lock index 6b14e6a..4de4fd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3400,7 +3400,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.10.1" +version = "2.10.3" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 744d993..672bbd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.10.2" +version = "2.10.3" edition = "2021" [[bin]] diff --git a/config/config.toml b/config/config.toml index 39b254b..ea62bcf 100644 --- a/config/config.toml +++ b/config/config.toml @@ -6,6 +6,18 @@ # connection is not exposed for unauthorized access. listen_address = "127.0.0.1:8910" +# Size of the buffer of each Server's channel on which `notify_price` events are +# received from the Price state. +# notify_price_tx_buffer = 10000 + +# Size of the buffer of each Server's channel on which `notify_price_sched` events are +# received from the Price state. +# notify_price_sched_tx_buffer = 10000 + +# Flush interval for responses and notifications. This is the maximum time the +# server will wait before flushing the messages to the client. +# flush_interval_duration = "50ms" + # Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint. [primary_network] ### Required fields ### @@ -186,8 +198,8 @@ key_store.mapping_key = "RelevantOracleMappingAddress" ## Configuration for OpenTelemetry ## [opentelemetry] -# Timeout in seconds for the OpenTelemetry exporter -exporter_timeout_secs = 3 +# Timeout duration for the OpenTelemetry exporter +exporter_timeout_duration = "3s" # Endpoint URL for the OpenTelemetry exporter exporter_endpoint = "http://127.0.0.1:4317" diff --git a/src/agent/config.rs b/src/agent/config.rs index 8fb2292..40cb234 100644 --- a/src/agent/config.rs +++ b/src/agent/config.rs @@ -13,7 +13,10 @@ use { File, }, serde::Deserialize, - std::path::Path, + std::{ + path::Path, + time::Duration, + }, }; /// Configuration for all components of the Agent @@ -88,6 +91,7 @@ impl Default for ChannelCapacities { #[derive(Deserialize, Debug)] pub struct OpenTelemetryConfig { - pub exporter_timeout_secs: u64, - pub exporter_endpoint: String, + #[serde(with = "humantime_serde")] + pub exporter_timeout_duration: Duration, + pub exporter_endpoint: String, } diff --git a/src/agent/pyth/rpc.rs b/src/agent/pyth/rpc.rs index ea8b7a7..ae52f0e 100644 --- a/src/agent/pyth/rpc.rs +++ b/src/agent/pyth/rpc.rs @@ -48,6 +48,7 @@ use { fmt::Debug, net::SocketAddr, sync::Arc, + time::Duration, }, tokio::sync::mpsc, tracing::instrument, @@ -115,6 +116,7 @@ async fn handle_connection( state: Arc, notify_price_tx_buffer: usize, notify_price_sched_tx_buffer: usize, + flush_interval_duration: Duration, ) where S: state::Prices, S: Send, @@ -127,6 +129,8 @@ async fn handle_connection( let (mut notify_price_sched_tx, mut notify_price_sched_rx) = mpsc::channel(notify_price_sched_tx_buffer); + let mut flush_interval = tokio::time::interval(flush_interval_duration); + loop { if let Err(err) = handle_next( &*state, @@ -136,6 +140,7 @@ async fn handle_connection( &mut notify_price_rx, &mut notify_price_sched_tx, &mut notify_price_sched_rx, + &mut flush_interval, ) .await { @@ -159,6 +164,7 @@ async fn handle_next( notify_price_rx: &mut mpsc::Receiver, notify_price_sched_tx: &mut mpsc::Sender, notify_price_sched_rx: &mut mpsc::Receiver, + flush_interval: &mut tokio::time::Interval, ) -> Result<()> where S: state::Prices, @@ -183,13 +189,16 @@ where } } Some(notify_price) = notify_price_rx.recv() => { - send_notification(ws_tx, Method::NotifyPrice, Some(notify_price)) + feed_notification(ws_tx, Method::NotifyPrice, Some(notify_price)) .await } Some(notify_price_sched) = notify_price_sched_rx.recv() => { - send_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched)) + feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched)) .await } + _ = flush_interval.tick() => { + flush(ws_tx).await + } } } @@ -229,9 +238,9 @@ where // Send an array if we're handling a batch // request, single response object otherwise if is_batch { - send_text(ws_tx, &serde_json::to_string(&responses)?).await?; + feed_text(ws_tx, &serde_json::to_string(&responses)?).await?; } else { - send_text(ws_tx, &serde_json::to_string(&responses[0])?).await?; + feed_text(ws_tx, &serde_json::to_string(&responses[0])?).await?; } } // The top-level parsing errors are fine to share with client @@ -354,10 +363,10 @@ async fn send_error( error.to_string(), None, ); - send_text(ws_tx, &response.to_string()).await + feed_text(ws_tx, &response.to_string()).await } -async fn send_notification( +async fn feed_notification( ws_tx: &mut SplitSink, method: Method, params: Option, @@ -365,10 +374,10 @@ async fn send_notification( where T: Sized + Serialize + DeserializeOwned, { - send_request(ws_tx, IdReq::Notification, method, params).await + feed_request(ws_tx, IdReq::Notification, method, params).await } -async fn send_request( +async fn feed_request( ws_tx: &mut SplitSink, id: I, method: Method, @@ -379,16 +388,20 @@ where T: Sized + Serialize + DeserializeOwned, { let request = Request::with_params(id, method, params); - send_text(ws_tx, &request.to_string()).await + feed_text(ws_tx, &request.to_string()).await } -async fn send_text(ws_tx: &mut SplitSink, msg: &str) -> Result<()> { +async fn feed_text(ws_tx: &mut SplitSink, msg: &str) -> Result<()> { ws_tx - .send(Message::text(msg.to_string())) + .feed(Message::text(msg.to_string())) .await .map_err(|e| e.into()) } +async fn flush(ws_tx: &mut SplitSink) -> Result<()> { + ws_tx.flush().await.map_err(|e| e.into()) +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct Config { @@ -400,6 +413,9 @@ pub struct Config { /// Size of the buffer of each Server's channel on which `notify_price_sched` events are /// received from the Price state. pub notify_price_sched_tx_buffer: usize, + /// Flush interval duration for the notifications. + #[serde(with = "humantime_serde")] + pub flush_interval_duration: Duration, } impl Default for Config { @@ -408,6 +424,7 @@ impl Default for Config { listen_address: "127.0.0.1:8910".to_string(), notify_price_tx_buffer: 10000, notify_price_sched_tx_buffer: 10000, + flush_interval_duration: Duration::from_millis(50), } } } @@ -448,6 +465,7 @@ where state, config.notify_price_tx_buffer, config.notify_price_sched_tx_buffer, + config.flush_interval_duration, ) .await }) diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 9312c2d..2e1759b 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -14,7 +14,6 @@ use { std::{ io::IsTerminal, path::PathBuf, - time::Duration, }, tracing_subscriber::{ prelude::*, @@ -65,9 +64,7 @@ async fn main() -> Result<()> { let otlp_exporter = opentelemetry_otlp::new_exporter() .tonic() .with_endpoint(&opentelemetry_config.exporter_endpoint) - .with_timeout(Duration::from_secs( - opentelemetry_config.exporter_timeout_secs, - )); + .with_timeout(opentelemetry_config.exporter_timeout_duration); // Set up the OpenTelemetry tracer let tracer = opentelemetry_otlp::new_pipeline()