|
45 | 45 |
|
46 | 46 | const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30);
|
47 | 47 | const MAX_CLIENT_MESSAGE_SIZE: usize = 100 * 1024; // 100 KiB
|
| 48 | +const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours |
48 | 49 |
|
49 | 50 | /// The maximum number of bytes that can be sent per second per IP address.
|
50 | 51 | /// If the limit is exceeded, the connection is closed.
|
@@ -252,6 +253,7 @@ pub struct Subscriber<S> {
|
252 | 253 | sender: SplitSink<WebSocket, Message>,
|
253 | 254 | price_feeds_with_config: HashMap<PriceIdentifier, PriceFeedClientConfig>,
|
254 | 255 | ping_interval: tokio::time::Interval,
|
| 256 | + connection_timeout: tokio::time::Sleep, |
255 | 257 | exit: watch::Receiver<bool>,
|
256 | 258 | responded_to_ping: bool,
|
257 | 259 | }
|
@@ -280,6 +282,7 @@ where
|
280 | 282 | sender,
|
281 | 283 | price_feeds_with_config: HashMap::new(),
|
282 | 284 | ping_interval: tokio::time::interval(PING_INTERVAL_DURATION),
|
| 285 | + connection_timeout: tokio::time::sleep(MAX_CONNECTION_DURATION), |
283 | 286 | exit: crate::EXIT.subscribe(),
|
284 | 287 | responded_to_ping: true, // We start with true so we don't close the connection immediately
|
285 | 288 | }
|
@@ -325,6 +328,26 @@ where
|
325 | 328 | self.sender.send(Message::Ping(vec![])).await?;
|
326 | 329 | Ok(())
|
327 | 330 | },
|
| 331 | + _ = &mut self.connection_timeout => { |
| 332 | + tracing::info!( |
| 333 | + id = self.id, |
| 334 | + ip = ?self.ip_addr, |
| 335 | + "Connection timeout reached (24h). Closing connection.", |
| 336 | + ); |
| 337 | + self.sender |
| 338 | + .send( |
| 339 | + serde_json::to_string(&ServerMessage::Response( |
| 340 | + ServerResponseMessage::Err { |
| 341 | + error: "Connection timeout reached (24h)".to_string(), |
| 342 | + }, |
| 343 | + ))? |
| 344 | + .into(), |
| 345 | + ) |
| 346 | + .await?; |
| 347 | + self.sender.close().await?; |
| 348 | + self.closed = true; |
| 349 | + return Ok(()); |
| 350 | + }, |
328 | 351 | _ = self.exit.changed() => {
|
329 | 352 | self.sender.close().await?;
|
330 | 353 | self.closed = true;
|
|
0 commit comments