1- use std:: { collections:: HashMap , sync :: Arc , fmt :: Debug } ;
1+ use std:: { collections:: HashMap , fmt :: Debug , sync :: { atomic :: { AtomicI32 , Ordering } , Arc } } ;
22
33use async_tungstenite:: tungstenite:: { Message , self } ;
44use futures:: { prelude:: * , channel:: mpsc:: { Sender , self } , stream:: { SplitSink , SplitStream } , lock:: Mutex } ;
@@ -9,15 +9,16 @@ use tracing::{warn, error, debug, info};
99use crate :: { Check , Error , Result , Spawner } ;
1010
1111/// A connection to the lighthouse server for sending requests and receiving events.
12+ #[ derive( Clone ) ]
1213pub struct Lighthouse < S > {
1314 /// The sink-part of the WebSocket connection.
14- ws_sink : SplitSink < S , Message > ,
15+ ws_sink : Arc < Mutex < SplitSink < S , Message > > > ,
1516 /// The response/event slots, keyed by request id.
1617 slots : Arc < Mutex < HashMap < i32 , Slot < ServerMessage < Value > > > > > ,
1718 /// The credentials used to authenticate with the lighthouse.
1819 authentication : Authentication ,
1920 /// The next request id. Incremented on every request.
20- request_id : i32 ,
21+ request_id : Arc < AtomicI32 > ,
2122}
2223
2324/// A facility for coordinating asynchronous responses to a request between a
@@ -47,10 +48,10 @@ impl<S> Lighthouse<S>
4748 let ( ws_sink, ws_stream) = web_socket. split ( ) ;
4849 let slots = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
4950 let lh = Self {
50- ws_sink,
51+ ws_sink : Arc :: new ( Mutex :: new ( ws_sink ) ) ,
5152 slots : slots. clone ( ) ,
5253 authentication,
53- request_id : 0 ,
54+ request_id : Arc :: new ( AtomicI32 :: new ( 0 ) ) ,
5455 } ;
5556 W :: spawn ( Self :: run_receive_loop ( ws_stream, slots) ) ;
5657 Ok ( lh)
@@ -220,9 +221,8 @@ impl<S> Lighthouse<S>
220221 where
221222 P : Serialize {
222223 let path = path. into_iter ( ) . map ( |s| s. to_string ( ) ) . collect ( ) ;
223- let request_id = self . request_id ;
224+ let request_id = self . request_id . fetch_add ( 1 , Ordering :: Relaxed ) ;
224225 debug ! { %request_id, "Sending request" } ;
225- self . request_id += 1 ;
226226 self . send_message ( & ClientMessage {
227227 request_id,
228228 authentication : self . authentication . clone ( ) ,
@@ -292,7 +292,7 @@ impl<S> Lighthouse<S>
292292
293293 /// Sends raw bytes to the lighthouse via the WebSocket connection.
294294 async fn send_raw ( & mut self , bytes : impl Into < Vec < u8 > > + Debug ) -> Result < ( ) > {
295- Ok ( self . ws_sink . send ( Message :: Binary ( bytes. into ( ) ) ) . await ?)
295+ Ok ( self . ws_sink . lock ( ) . await . send ( Message :: Binary ( bytes. into ( ) ) ) . await ?)
296296 }
297297
298298 /// Fetches the credentials used to authenticate with the lighthouse.
0 commit comments