@@ -9,10 +9,116 @@ use crate::{app::Event, protocol::ServerEvent};
99use futures_util:: { SinkExt , StreamExt , TryFutureExt } ;
1010use tokio_websockets:: Message ;
1111
12+ enum SubmitItem {
13+ JSON ( crate :: protocol:: ClientCommand ) ,
14+ AudioChunk ( Vec < u8 > ) ,
15+ Close ,
16+ }
17+
18+ async fn ws_manager (
19+ mut ws : tokio_websockets:: WebSocketStream <
20+ tokio_websockets:: MaybeTlsStream < tokio:: net:: TcpStream > ,
21+ > ,
22+ mut rx : tokio:: sync:: mpsc:: Receiver < SubmitItem > ,
23+ tx : tokio:: sync:: mpsc:: Sender < ServerEvent > ,
24+ ) -> anyhow:: Result < ( ) > {
25+ enum SelectItem {
26+ Recv ( Option < Result < Message , tokio_websockets:: error:: Error > > ) ,
27+ Send ( Option < SubmitItem > ) ,
28+ }
29+
30+ loop {
31+ let recv_fut = ws. next ( ) ;
32+ let send_fut = rx. recv ( ) ;
33+ let item = tokio:: select! {
34+ recv = recv_fut => {
35+ SelectItem :: Recv ( recv)
36+ } ,
37+ send = send_fut => {
38+ SelectItem :: Send ( send)
39+ } ,
40+ } ;
41+
42+ match item {
43+ SelectItem :: Recv ( Some ( Ok ( msg) ) ) => {
44+ if msg. is_binary ( ) {
45+ let payload = msg. into_payload ( ) ;
46+ let evt = rmp_serde:: from_slice :: < ServerEvent > ( & payload)
47+ . map_err ( |e| anyhow:: anyhow!( "Failed to deserialize binary data: {}" , e) ) ?;
48+ tx. send ( evt)
49+ . await
50+ . map_err ( |e| anyhow:: anyhow!( "Failed to send event to channel: {}" , e) ) ?;
51+ } else {
52+ log:: error!( "Unexpected non-binary WebSocket message received" ) ;
53+ continue ;
54+ }
55+ }
56+ SelectItem :: Recv ( None ) => {
57+ log:: info!( "WebSocket stream ended" ) ;
58+ return Ok ( ( ) ) ;
59+ }
60+ SelectItem :: Recv ( Some ( Err ( e) ) ) => {
61+ log:: error!( "WebSocket receive error: {}" , e) ;
62+ return Err ( anyhow:: anyhow!( "WebSocket receive error: {}" , e) ) ;
63+ }
64+ SelectItem :: Send ( Some ( msg) ) => {
65+ log:: info!( "WebSocket message sent" ) ;
66+ match msg {
67+ SubmitItem :: JSON ( cmd) => {
68+ let payload = serde_json:: to_string ( & cmd) . map_err ( |e| {
69+ anyhow:: anyhow!( "Failed to serialize command to JSON: {}" , e)
70+ } ) ?;
71+ let msg = Message :: text ( payload) ;
72+ ws. send ( msg)
73+ . await
74+ . map_err ( |e| anyhow:: anyhow!( "WebSocket send error: {}" , e) ) ?;
75+ }
76+ SubmitItem :: AudioChunk ( chunk) => {
77+ let msg = Message :: binary ( bytes:: Bytes :: from ( chunk) ) ;
78+ ws. send ( msg)
79+ . await
80+ . map_err ( |e| anyhow:: anyhow!( "WebSocket send error: {}" , e) ) ?;
81+ }
82+ SubmitItem :: Close => {
83+ ws. close ( )
84+ . await
85+ . map_err ( |e| anyhow:: anyhow!( "WebSocket close error: {}" , e) ) ?;
86+ log:: info!( "WebSocket closed by client request" ) ;
87+ return Ok ( ( ) ) ;
88+ }
89+ }
90+ }
91+ SelectItem :: Send ( None ) => {
92+ log:: info!( "WebSocket send channel closed" ) ;
93+ return Ok ( ( ) ) ;
94+ }
95+ }
96+ }
97+ }
98+
99+ async fn connect_handler (
100+ ws : tokio_websockets:: WebSocketStream < tokio_websockets:: MaybeTlsStream < tokio:: net:: TcpStream > > ,
101+ ) -> (
102+ tokio:: sync:: mpsc:: Sender < SubmitItem > ,
103+ tokio:: sync:: mpsc:: Receiver < ServerEvent > ,
104+ ) {
105+ let ( tx_ws, rx) = tokio:: sync:: mpsc:: channel :: < SubmitItem > ( 32 ) ;
106+ let ( tx, rx_ws) = tokio:: sync:: mpsc:: channel :: < ServerEvent > ( 32 ) ;
107+
108+ tokio:: spawn ( async move {
109+ if let Err ( e) = ws_manager ( ws, rx, tx) . await {
110+ log:: error!( "WebSocket manager error: {}" , e) ;
111+ }
112+ } ) ;
113+
114+ ( tx_ws, rx_ws)
115+ }
116+
12117pub struct Server {
13118 pub uri : String ,
14119 timeout : std:: time:: Duration ,
15- ws : tokio_websockets:: WebSocketStream < tokio_websockets:: MaybeTlsStream < tokio:: net:: TcpStream > > ,
120+ tx : tokio:: sync:: mpsc:: Sender < SubmitItem > ,
121+ rx : tokio:: sync:: mpsc:: Receiver < ServerEvent > ,
16122}
17123
18124impl Server {
@@ -24,7 +130,14 @@ impl Server {
24130
25131 let timeout = std:: time:: Duration :: from_secs ( 30 ) ;
26132
27- Ok ( Self { uri, timeout, ws } )
133+ let ( tx, rx) = connect_handler ( ws) . await ;
134+
135+ Ok ( Self {
136+ uri,
137+ timeout,
138+ tx,
139+ rx,
140+ } )
28141 }
29142
30143 pub fn set_timeout ( & mut self , timeout : std:: time:: Duration ) {
@@ -39,7 +152,9 @@ impl Server {
39152 . await
40153 . map_err ( |e| anyhow:: anyhow!( "Failed to reconnect: {}" , e) ) ?;
41154
42- self . ws = ws;
155+ let ( tx, rx) = connect_handler ( ws) . await ;
156+ self . tx = tx;
157+ self . rx = rx;
43158 Ok ( ( ) )
44159 }
45160
@@ -62,54 +177,45 @@ impl Server {
62177 }
63178
64179 pub async fn close ( & mut self ) -> anyhow:: Result < ( ) > {
65- self . ws . close ( ) . await ? ;
180+ let _ = self . send ( SubmitItem :: Close ) . await ;
66181 Ok ( ( ) )
67182 }
68183
69- pub async fn send ( & mut self , msg : Message ) -> anyhow:: Result < ( ) > {
70- tokio:: time:: timeout ( self . timeout , self . ws . send ( msg) )
184+ async fn send ( & mut self , msg : SubmitItem ) -> anyhow:: Result < ( ) > {
185+ tokio:: time:: timeout ( self . timeout , self . tx . send ( msg) )
71186 . map_err ( |_| anyhow:: anyhow!( "Timeout sending message" ) )
72- . await ??;
187+ . await ?
188+ . map_err ( |_| anyhow:: anyhow!( "Failed to send message" ) ) ?;
73189 Ok ( ( ) )
74190 }
75191
76192 pub async fn send_client_command (
77193 & mut self ,
78194 cmd : crate :: protocol:: ClientCommand ,
79195 ) -> anyhow:: Result < ( ) > {
80- let payload = serde_json:: to_string ( & cmd)
81- . map_err ( |e| anyhow:: anyhow!( "Failed to serialize command: {}" , e) ) ?;
82- let msg = Message :: text ( payload) ;
83- self . send ( msg ) . await
196+ // let payload = serde_json::to_string(&cmd)
197+ // .map_err(|e| anyhow::anyhow!("Failed to serialize command: {}", e))?;
198+ // let msg = Message::text(payload);
199+ self . send ( SubmitItem :: JSON ( cmd ) ) . await
84200 }
85201
86- pub async fn send_client_audio_chunk ( & mut self , chunk : bytes:: Bytes ) -> anyhow:: Result < ( ) > {
87- let msg = Message :: binary ( chunk) ;
88- self . send ( msg) . await
202+ pub async fn send_client_audio_chunk ( & mut self , chunk : Vec < u8 > ) -> anyhow:: Result < ( ) > {
203+ self . send ( SubmitItem :: AudioChunk ( chunk) ) . await
89204 }
90205
91206 pub async fn send_client_audio_chunk_i16 ( & mut self , chunk : Vec < i16 > ) -> anyhow:: Result < ( ) > {
92207 let audio_buffer_u8 =
93208 unsafe { std:: slice:: from_raw_parts ( chunk. as_ptr ( ) as * const u8 , chunk. len ( ) * 2 ) } ;
94209
95- self . send_client_audio_chunk ( bytes:: Bytes :: from ( audio_buffer_u8) )
96- . await
210+ self . send_client_audio_chunk ( audio_buffer_u8. to_vec ( ) ) . await
97211 }
98212
99213 pub async fn recv ( & mut self ) -> anyhow:: Result < Event > {
100214 let msg = self
101- . ws
102- . next ( )
215+ . rx
216+ . recv ( )
103217 . await
104- . ok_or_else ( || anyhow:: anyhow!( "WS channel closed" ) ) ??;
105-
106- if msg. is_binary ( ) {
107- let payload = msg. into_payload ( ) ;
108- let evt = rmp_serde:: from_slice :: < ServerEvent > ( & payload)
109- . map_err ( |e| anyhow:: anyhow!( "Failed to deserialize binary data: {}" , e) ) ?;
110- Ok ( Event :: ServerEvent ( evt) )
111- } else {
112- Err ( anyhow:: anyhow!( "Invalid message type" ) )
113- }
218+ . ok_or_else ( || anyhow:: anyhow!( "WS channel closed" ) ) ?;
219+ Ok ( Event :: ServerEvent ( msg) )
114220 }
115221}
0 commit comments