Skip to content

Commit 32d2c05

Browse files
committed
fix: feed rpc respones and notifications instead of send and flush
1 parent efc17f5 commit 32d2c05

File tree

2 files changed

+41
-11
lines changed

2 files changed

+41
-11
lines changed

config/config.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,18 @@
66
# connection is not exposed for unauthorized access.
77
listen_address = "127.0.0.1:8910"
88

9+
# Size of the buffer of each Server's channel on which `notify_price` events are
10+
# received from the Price state.
11+
# notify_price_tx_buffer = 10000
12+
13+
# Size of the buffer of each Server's channel on which `notify_price_sched` events are
14+
# received from the Price state.
15+
# notify_price_sched_tx_buffer = 10000
16+
17+
# Flush interval for responses and notifications. This is the maximum time the
18+
# server will wait before flushing the messages to the client.
19+
# flush_interval_duration = "100ms"
20+
921
# Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint.
1022
[primary_network]
1123
### Required fields ###

src/agent/pyth/rpc.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use {
4848
fmt::Debug,
4949
net::SocketAddr,
5050
sync::Arc,
51+
time::Duration,
5152
},
5253
tokio::sync::mpsc,
5354
tracing::instrument,
@@ -115,6 +116,7 @@ async fn handle_connection<S>(
115116
state: Arc<S>,
116117
notify_price_tx_buffer: usize,
117118
notify_price_sched_tx_buffer: usize,
119+
flush_interval_duration: Duration,
118120
) where
119121
S: state::Prices,
120122
S: Send,
@@ -127,6 +129,8 @@ async fn handle_connection<S>(
127129
let (mut notify_price_sched_tx, mut notify_price_sched_rx) =
128130
mpsc::channel(notify_price_sched_tx_buffer);
129131

132+
let mut flush_interval = tokio::time::interval(flush_interval_duration);
133+
130134
loop {
131135
if let Err(err) = handle_next(
132136
&*state,
@@ -136,6 +140,7 @@ async fn handle_connection<S>(
136140
&mut notify_price_rx,
137141
&mut notify_price_sched_tx,
138142
&mut notify_price_sched_rx,
143+
&mut flush_interval,
139144
)
140145
.await
141146
{
@@ -159,6 +164,7 @@ async fn handle_next<S>(
159164
notify_price_rx: &mut mpsc::Receiver<NotifyPrice>,
160165
notify_price_sched_tx: &mut mpsc::Sender<NotifyPriceSched>,
161166
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
167+
notify_flush_interval: &mut tokio::time::Interval,
162168
) -> Result<()>
163169
where
164170
S: state::Prices,
@@ -183,13 +189,16 @@ where
183189
}
184190
}
185191
Some(notify_price) = notify_price_rx.recv() => {
186-
send_notification(ws_tx, Method::NotifyPrice, Some(notify_price))
192+
feed_notification(ws_tx, Method::NotifyPrice, Some(notify_price))
187193
.await
188194
}
189195
Some(notify_price_sched) = notify_price_sched_rx.recv() => {
190-
send_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
196+
feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
191197
.await
192198
}
199+
_ = notify_flush_interval.tick() => {
200+
flush(ws_tx).await
201+
}
193202
}
194203
}
195204

@@ -229,9 +238,9 @@ where
229238
// Send an array if we're handling a batch
230239
// request, single response object otherwise
231240
if is_batch {
232-
send_text(ws_tx, &serde_json::to_string(&responses)?).await?;
241+
feed_text(ws_tx, &serde_json::to_string(&responses)?).await?;
233242
} else {
234-
send_text(ws_tx, &serde_json::to_string(&responses[0])?).await?;
243+
feed_text(ws_tx, &serde_json::to_string(&responses[0])?).await?;
235244
}
236245
}
237246
// The top-level parsing errors are fine to share with client
@@ -354,21 +363,21 @@ async fn send_error(
354363
error.to_string(),
355364
None,
356365
);
357-
send_text(ws_tx, &response.to_string()).await
366+
feed_text(ws_tx, &response.to_string()).await
358367
}
359368

360-
async fn send_notification<T>(
369+
async fn feed_notification<T>(
361370
ws_tx: &mut SplitSink<WebSocket, Message>,
362371
method: Method,
363372
params: Option<T>,
364373
) -> Result<()>
365374
where
366375
T: Sized + Serialize + DeserializeOwned,
367376
{
368-
send_request(ws_tx, IdReq::Notification, method, params).await
377+
feed_request(ws_tx, IdReq::Notification, method, params).await
369378
}
370379

371-
async fn send_request<I, T>(
380+
async fn feed_request<I, T>(
372381
ws_tx: &mut SplitSink<WebSocket, Message>,
373382
id: I,
374383
method: Method,
@@ -379,16 +388,20 @@ where
379388
T: Sized + Serialize + DeserializeOwned,
380389
{
381390
let request = Request::with_params(id, method, params);
382-
send_text(ws_tx, &request.to_string()).await
391+
feed_text(ws_tx, &request.to_string()).await
383392
}
384393

385-
async fn send_text(ws_tx: &mut SplitSink<WebSocket, Message>, msg: &str) -> Result<()> {
394+
async fn feed_text(ws_tx: &mut SplitSink<WebSocket, Message>, msg: &str) -> Result<()> {
386395
ws_tx
387-
.send(Message::text(msg.to_string()))
396+
.feed(Message::text(msg.to_string()))
388397
.await
389398
.map_err(|e| e.into())
390399
}
391400

401+
async fn flush(ws_tx: &mut SplitSink<WebSocket, Message>) -> Result<()> {
402+
ws_tx.flush().await.map_err(|e| e.into())
403+
}
404+
392405
#[derive(Clone, Debug, Serialize, Deserialize)]
393406
#[serde(default)]
394407
pub struct Config {
@@ -400,6 +413,9 @@ pub struct Config {
400413
/// Size of the buffer of each Server's channel on which `notify_price_sched` events are
401414
/// received from the Price state.
402415
pub notify_price_sched_tx_buffer: usize,
416+
/// Flush interval duration for the notifications.
417+
#[serde(with = "humantime_serde")]
418+
pub flush_interval_duration: Duration,
403419
}
404420

405421
impl Default for Config {
@@ -408,6 +424,7 @@ impl Default for Config {
408424
listen_address: "127.0.0.1:8910".to_string(),
409425
notify_price_tx_buffer: 10000,
410426
notify_price_sched_tx_buffer: 10000,
427+
flush_interval_duration: Duration::from_millis(50),
411428
}
412429
}
413430
}
@@ -448,6 +465,7 @@ where
448465
state,
449466
config.notify_price_tx_buffer,
450467
config.notify_price_sched_tx_buffer,
468+
config.flush_interval_duration,
451469
)
452470
.await
453471
})

0 commit comments

Comments
 (0)