|
| 1 | +use super::super::Track; |
| 2 | +use super::Broadcaster; |
| 3 | +use async_trait::async_trait; |
| 4 | +use futures_util::{SinkExt, StreamExt}; |
| 5 | +use tokio::net::TcpListener; |
| 6 | +use tokio::sync::broadcast::{self, Sender}; |
| 7 | +use tokio_tungstenite::accept_async; |
| 8 | +use tokio_tungstenite::tungstenite::Message; |
| 9 | + |
| 10 | +pub struct WebSocketBroadcaster { |
| 11 | + bind_addr: String, |
| 12 | + sender: Sender<Track>, |
| 13 | +} |
| 14 | + |
| 15 | +impl WebSocketBroadcaster { |
| 16 | + pub fn new(bind_addr: String) -> Self { |
| 17 | + let (sender, _) = broadcast::channel(100); |
| 18 | + Self { bind_addr, sender } |
| 19 | + } |
| 20 | + |
| 21 | + async fn handle_client( |
| 22 | + stream: tokio::net::TcpStream, |
| 23 | + mut receiver: tokio::sync::broadcast::Receiver<Track>, |
| 24 | + ) { |
| 25 | + if let Ok(ws_stream) = accept_async(stream).await { |
| 26 | + let (mut sink, _) = ws_stream.split(); |
| 27 | + while let Ok(track) = receiver.recv().await { |
| 28 | + let csv = format!("{}\n", track.serialize()); |
| 29 | + let _ = sink.send(Message::Text(csv.into())).await; |
| 30 | + } |
| 31 | + } |
| 32 | + } |
| 33 | +} |
| 34 | + |
| 35 | +#[async_trait] |
| 36 | +impl Broadcaster for WebSocketBroadcaster { |
| 37 | + async fn start(&self) { |
| 38 | + let listener = TcpListener::bind(&self.bind_addr).await.unwrap(); |
| 39 | + let sender = self.sender.clone(); |
| 40 | + |
| 41 | + println!("📡 WebSocket broadcaster listening on {}", self.bind_addr); |
| 42 | + |
| 43 | + loop { |
| 44 | + let (stream, _) = listener.accept().await.unwrap(); |
| 45 | + let receiver = sender.subscribe(); |
| 46 | + tokio::spawn(Self::handle_client(stream, receiver)); |
| 47 | + } |
| 48 | + } |
| 49 | + |
| 50 | + fn sender(&self) -> Sender<Track> { |
| 51 | + self.sender.clone() |
| 52 | + } |
| 53 | +} |
0 commit comments