@@ -36,14 +36,17 @@ use {
36
36
} ,
37
37
std:: {
38
38
collections:: HashMap ,
39
+ pin:: Pin ,
39
40
sync:: atomic:: {
40
41
AtomicUsize ,
41
42
Ordering ,
42
43
} ,
44
+ time:: Duration ,
43
45
} ,
44
46
tokio:: sync:: mpsc,
45
47
} ;
46
48
49
+ pub const PING_INTERVAL_DURATION : Duration = Duration :: from_secs ( 30 ) ;
47
50
48
51
pub async fn ws_route_handler (
49
52
ws : WebSocketUpgrade ,
@@ -83,6 +86,8 @@ pub struct Subscriber {
83
86
receiver : SplitStream < WebSocket > ,
84
87
sender : SplitSink < WebSocket , Message > ,
85
88
price_feeds_with_config : HashMap < PriceIdentifier , PriceFeedClientConfig > ,
89
+ ping_interval_future : Pin < Box < tokio:: time:: Sleep > > ,
90
+ responded_to_ping : bool ,
86
91
}
87
92
88
93
impl Subscriber {
@@ -101,6 +106,8 @@ impl Subscriber {
101
106
receiver,
102
107
sender,
103
108
price_feeds_with_config : HashMap :: new ( ) ,
109
+ ping_interval_future : Box :: pin ( tokio:: time:: sleep ( PING_INTERVAL_DURATION ) ) ,
110
+ responded_to_ping : true , // We start with true so we don't close the connection immediately
104
111
}
105
112
}
106
113
@@ -131,6 +138,16 @@ impl Subscriber {
131
138
Some ( message_or_err) => self . handle_client_message( message_or_err?) . await ?
132
139
}
133
140
} ,
141
+ _ = & mut self . ping_interval_future => {
142
+ if !self . responded_to_ping {
143
+ log:: debug!( "Subscriber {} did not respond to ping, closing connection." , self . id) ;
144
+ self . closed = true ;
145
+ return Ok ( ( ) ) ;
146
+ }
147
+ self . responded_to_ping = false ;
148
+ self . sender. send( Message :: Ping ( vec![ ] ) ) . await ?;
149
+ self . ping_interval_future = Box :: pin( tokio:: time:: sleep( PING_INTERVAL_DURATION ) ) ;
150
+ }
134
151
}
135
152
136
153
Ok ( ( ) )
@@ -180,6 +197,14 @@ impl Subscriber {
180
197
let maybe_client_message = match message {
181
198
Message :: Text ( text) => serde_json:: from_str :: < ClientMessage > ( & text) ,
182
199
Message :: Binary ( data) => serde_json:: from_slice :: < ClientMessage > ( & data) ,
200
+ Message :: Ping ( _) => {
201
+ // Axum will send Pong automatically
202
+ return Ok ( ( ) ) ;
203
+ }
204
+ Message :: Pong ( _) => {
205
+ self . responded_to_ping = true ;
206
+ return Ok ( ( ) ) ;
207
+ }
183
208
_ => {
184
209
return Ok ( ( ) ) ;
185
210
}
@@ -188,7 +213,7 @@ impl Subscriber {
188
213
match maybe_client_message {
189
214
Err ( e) => {
190
215
self . sender
191
- . feed (
216
+ . send (
192
217
serde_json:: to_string ( & ServerMessage :: Response (
193
218
ServerResponseMessage :: Err {
194
219
error : e. to_string ( ) ,
0 commit comments