Skip to content

Commit 729dc0c

Browse files
committed
fix: feed rpc respones and notifications instead of send and flush
1 parent efc17f5 commit 729dc0c

File tree

2 files changed

+49
-12
lines changed

2 files changed

+49
-12
lines changed

config/config.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,18 @@
66
# connection is not exposed for unauthorized access.
77
listen_address = "127.0.0.1:8910"
88

9+
# Size of the buffer of each Server's channel on which `notify_price` events are
10+
# received from the Price state.
11+
# notify_price_tx_buffer = 10000
12+
13+
# Size of the buffer of each Server's channel on which `notify_price_sched` events are
14+
# received from the Price state.
15+
# notify_price_sched_tx_buffer = 10000
16+
17+
# Flush interval for the notifications. This is the maximum time the server
18+
# will wait before flushing the notifications to the client.
19+
# notify_flush_interval_duration = "100ms"
20+
921
# Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint.
1022
[primary_network]
1123
### Required fields ###

src/agent/pyth/rpc.rs

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use {
4848
fmt::Debug,
4949
net::SocketAddr,
5050
sync::Arc,
51+
time::Duration,
5152
},
5253
tokio::sync::mpsc,
5354
tracing::instrument,
@@ -115,6 +116,7 @@ async fn handle_connection<S>(
115116
state: Arc<S>,
116117
notify_price_tx_buffer: usize,
117118
notify_price_sched_tx_buffer: usize,
119+
notify_flush_interval_duration: Duration,
118120
) where
119121
S: state::Prices,
120122
S: Send,
@@ -127,6 +129,8 @@ async fn handle_connection<S>(
127129
let (mut notify_price_sched_tx, mut notify_price_sched_rx) =
128130
mpsc::channel(notify_price_sched_tx_buffer);
129131

132+
let mut notify_flush_interval = tokio::time::interval(notify_flush_interval_duration);
133+
130134
loop {
131135
if let Err(err) = handle_next(
132136
&*state,
@@ -136,6 +140,7 @@ async fn handle_connection<S>(
136140
&mut notify_price_rx,
137141
&mut notify_price_sched_tx,
138142
&mut notify_price_sched_rx,
143+
&mut notify_flush_interval,
139144
)
140145
.await
141146
{
@@ -159,6 +164,7 @@ async fn handle_next<S>(
159164
notify_price_rx: &mut mpsc::Receiver<NotifyPrice>,
160165
notify_price_sched_tx: &mut mpsc::Sender<NotifyPriceSched>,
161166
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
167+
notify_flush_interval: &mut tokio::time::Interval,
162168
) -> Result<()>
163169
where
164170
S: state::Prices,
@@ -183,13 +189,16 @@ where
183189
}
184190
}
185191
Some(notify_price) = notify_price_rx.recv() => {
186-
send_notification(ws_tx, Method::NotifyPrice, Some(notify_price))
192+
feed_notification(ws_tx, Method::NotifyPrice, Some(notify_price))
187193
.await
188194
}
189195
Some(notify_price_sched) = notify_price_sched_rx.recv() => {
190-
send_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
196+
feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
191197
.await
192198
}
199+
_ = notify_flush_interval.tick() => {
200+
flush(ws_tx).await
201+
}
193202
}
194203
}
195204

@@ -357,18 +366,18 @@ async fn send_error(
357366
send_text(ws_tx, &response.to_string()).await
358367
}
359368

360-
async fn send_notification<T>(
369+
async fn feed_notification<T>(
361370
ws_tx: &mut SplitSink<WebSocket, Message>,
362371
method: Method,
363372
params: Option<T>,
364373
) -> Result<()>
365374
where
366375
T: Sized + Serialize + DeserializeOwned,
367376
{
368-
send_request(ws_tx, IdReq::Notification, method, params).await
377+
feed_request(ws_tx, IdReq::Notification, method, params).await
369378
}
370379

371-
async fn send_request<I, T>(
380+
async fn feed_request<I, T>(
372381
ws_tx: &mut SplitSink<WebSocket, Message>,
373382
id: I,
374383
method: Method,
@@ -379,7 +388,14 @@ where
379388
T: Sized + Serialize + DeserializeOwned,
380389
{
381390
let request = Request::with_params(id, method, params);
382-
send_text(ws_tx, &request.to_string()).await
391+
feed_text(ws_tx, &request.to_string()).await
392+
}
393+
394+
async fn feed_text(ws_tx: &mut SplitSink<WebSocket, Message>, msg: &str) -> Result<()> {
395+
ws_tx
396+
.feed(Message::text(msg.to_string()))
397+
.await
398+
.map_err(|e| e.into())
383399
}
384400

385401
async fn send_text(ws_tx: &mut SplitSink<WebSocket, Message>, msg: &str) -> Result<()> {
@@ -389,25 +405,33 @@ async fn send_text(ws_tx: &mut SplitSink<WebSocket, Message>, msg: &str) -> Resu
389405
.map_err(|e| e.into())
390406
}
391407

408+
async fn flush(ws_tx: &mut SplitSink<WebSocket, Message>) -> Result<()> {
409+
ws_tx.flush().await.map_err(|e| e.into())
410+
}
411+
392412
#[derive(Clone, Debug, Serialize, Deserialize)]
393413
#[serde(default)]
394414
pub struct Config {
395415
/// The address which the websocket API server will listen on.
396-
pub listen_address: String,
416+
pub listen_address: String,
397417
/// Size of the buffer of each Server's channel on which `notify_price` events are
398418
/// received from the Price state.
399-
pub notify_price_tx_buffer: usize,
419+
pub notify_price_tx_buffer: usize,
400420
/// Size of the buffer of each Server's channel on which `notify_price_sched` events are
401421
/// received from the Price state.
402-
pub notify_price_sched_tx_buffer: usize,
422+
pub notify_price_sched_tx_buffer: usize,
423+
/// Flush interval duration for the notifications.
424+
#[serde(with = "humantime_serde")]
425+
pub notify_flush_interval_duration: Duration,
403426
}
404427

405428
impl Default for Config {
406429
fn default() -> Self {
407430
Self {
408-
listen_address: "127.0.0.1:8910".to_string(),
409-
notify_price_tx_buffer: 10000,
410-
notify_price_sched_tx_buffer: 10000,
431+
listen_address: "127.0.0.1:8910".to_string(),
432+
notify_price_tx_buffer: 10000,
433+
notify_price_sched_tx_buffer: 10000,
434+
notify_flush_interval_duration: Duration::from_millis(50),
411435
}
412436
}
413437
}
@@ -448,6 +472,7 @@ where
448472
state,
449473
config.notify_price_tx_buffer,
450474
config.notify_price_sched_tx_buffer,
475+
config.notify_flush_interval_duration,
451476
)
452477
.await
453478
})

0 commit comments

Comments
 (0)