18
18
anyhow,
19
19
Result ,
20
20
} ,
21
+ futures:: future:: OptionFuture ,
21
22
futures_util:: {
22
23
stream:: {
23
24
SplitSink ,
50
51
sync:: Arc ,
51
52
time:: Duration ,
52
53
} ,
53
- tokio:: sync:: mpsc,
54
+ tokio:: {
55
+ sync:: mpsc,
56
+ time:: Interval ,
57
+ } ,
54
58
tracing:: instrument,
55
59
warp:: {
56
60
ws:: {
@@ -111,11 +115,18 @@ enum ConnectionError {
111
115
WebsocketConnectionClosed ,
112
116
}
113
117
118
+ #[ derive( Debug ) ]
119
+ enum FlushStrategy {
120
+ Instant ,
121
+ Interval ( Interval ) ,
122
+ }
123
+
114
124
async fn handle_connection < S > (
115
125
ws_conn : WebSocket ,
116
126
state : Arc < S > ,
117
127
notify_price_tx_buffer : usize ,
118
128
notify_price_sched_tx_buffer : usize ,
129
+ instant_flush : bool ,
119
130
flush_interval_duration : Duration ,
120
131
) where
121
132
S : state:: Prices ,
@@ -129,7 +140,10 @@ async fn handle_connection<S>(
129
140
let ( mut notify_price_sched_tx, mut notify_price_sched_rx) =
130
141
mpsc:: channel ( notify_price_sched_tx_buffer) ;
131
142
132
- let mut flush_interval = tokio:: time:: interval ( flush_interval_duration) ;
143
+ let mut flush_strategy = match instant_flush {
144
+ true => FlushStrategy :: Instant ,
145
+ false => FlushStrategy :: Interval ( tokio:: time:: interval ( flush_interval_duration) ) ,
146
+ } ;
133
147
134
148
loop {
135
149
if let Err ( err) = handle_next (
@@ -140,7 +154,7 @@ async fn handle_connection<S>(
140
154
& mut notify_price_rx,
141
155
& mut notify_price_sched_tx,
142
156
& mut notify_price_sched_rx,
143
- & mut flush_interval ,
157
+ & mut flush_strategy ,
144
158
)
145
159
. await
146
160
{
@@ -156,6 +170,7 @@ async fn handle_connection<S>(
156
170
}
157
171
}
158
172
173
+ #[ allow( clippy:: too_many_arguments) ]
159
174
async fn handle_next < S > (
160
175
state : & S ,
161
176
ws_tx : & mut SplitSink < WebSocket , Message > ,
@@ -164,11 +179,17 @@ async fn handle_next<S>(
164
179
notify_price_rx : & mut mpsc:: Receiver < NotifyPrice > ,
165
180
notify_price_sched_tx : & mut mpsc:: Sender < NotifyPriceSched > ,
166
181
notify_price_sched_rx : & mut mpsc:: Receiver < NotifyPriceSched > ,
167
- flush_interval : & mut tokio :: time :: Interval ,
182
+ flush_strategy : & mut FlushStrategy ,
168
183
) -> Result < ( ) >
169
184
where
170
185
S : state:: Prices ,
171
186
{
187
+ let optional_flush_tick: OptionFuture < _ > = match flush_strategy {
188
+ FlushStrategy :: Instant => None ,
189
+ FlushStrategy :: Interval ( interval) => Some ( interval. tick ( ) ) ,
190
+ }
191
+ . into ( ) ;
192
+
172
193
tokio:: select! {
173
194
msg = ws_rx. next( ) => {
174
195
match msg {
@@ -196,9 +217,14 @@ where
196
217
feed_notification( ws_tx, Method :: NotifyPriceSched , Some ( notify_price_sched) )
197
218
. await
198
219
}
199
- _ = flush_interval . tick ( ) => {
220
+ Some ( _ ) = optional_flush_tick => {
200
221
flush( ws_tx) . await
201
222
}
223
+ } ?;
224
+
225
+ match flush_strategy {
226
+ FlushStrategy :: Interval ( _) => Ok ( ( ) ) ,
227
+ FlushStrategy :: Instant => flush ( ws_tx) . await ,
202
228
}
203
229
}
204
230
@@ -413,6 +439,8 @@ pub struct Config {
413
439
/// Size of the buffer of each Server's channel on which `notify_price_sched` events are
414
440
/// received from the Price state.
415
441
pub notify_price_sched_tx_buffer : usize ,
442
+ /// Whether to flush immediately after sending a message or notification.
443
+ pub instant_flush : bool ,
416
444
/// Flush interval duration for the notifications.
417
445
#[ serde( with = "humantime_serde" ) ]
418
446
pub flush_interval_duration : Duration ,
@@ -424,6 +452,7 @@ impl Default for Config {
424
452
listen_address : "127.0.0.1:8910" . to_string ( ) ,
425
453
notify_price_tx_buffer : 10000 ,
426
454
notify_price_sched_tx_buffer : 10000 ,
455
+ instant_flush : true ,
427
456
flush_interval_duration : Duration :: from_millis ( 50 ) ,
428
457
}
429
458
}
@@ -465,6 +494,7 @@ where
465
494
state,
466
495
config. notify_price_tx_buffer ,
467
496
config. notify_price_sched_tx_buffer ,
497
+ config. instant_flush ,
468
498
config. flush_interval_duration ,
469
499
)
470
500
. await
0 commit comments