1515// You should have received a copy of the GNU General Public License
1616// along with OpenMirroring. If not, see <https://www.gnu.org/licenses/>.
1717
18- use std:: {
19- io:: { Read , Write } ,
20- net:: { SocketAddr , TcpStream } ,
21- rc:: Rc ,
22- thread:: { self , JoinHandle , sleep} ,
23- time:: Duration ,
24- } ;
18+ use std:: { net:: SocketAddr , rc:: Rc , thread:: sleep, time:: Duration } ;
2519
2620use anyhow:: Result ;
21+ use common:: sender:: session:: { Session , SessionEvent } ;
2722use crossbeam_channel:: Receiver ;
2823use fcast_lib:: packet:: Packet ;
2924use log:: { debug, error} ;
@@ -46,13 +41,15 @@ enum Event {
4641struct Application {
4742 ui_weak : Weak < MainWindow > ,
4843 receivers : Vec < ( ReceiverItem , SocketAddr ) > ,
44+ session : Session ,
4945}
5046
5147impl Application {
5248 pub fn new ( ui_weak : Weak < MainWindow > ) -> Self {
5349 Self {
5450 ui_weak,
5551 receivers : Vec :: new ( ) ,
52+ session : Session :: default ( ) ,
5653 }
5754 }
5855
@@ -77,14 +74,6 @@ impl Application {
7774 ReceiverState :: Connectable
7875 }
7976
80- // fn set_all_receivers_connectable(&mut self) {
81- // for r in &mut self.receivers {
82- // if r.0.state != ReceiverState::Connectable {
83- // r.0.state = ReceiverState::Connectable;
84- // }
85- // }
86- // }
87-
8877 fn update_receivers_in_ui ( & mut self ) -> Result < ( ) > {
8978 let g_state = self . receivers_general_state ( ) ;
9079 for r in & mut self . receivers {
@@ -155,10 +144,6 @@ impl Application {
155144 let mdns = mdns_sd:: ServiceDaemon :: new ( ) ?;
156145 let mdns_receiver = mdns. browse ( "_fcast._tcp.local." ) ?;
157146
158- // TODO: move session to own thread
159- let mut session_stream_jh = None :: < JoinHandle < Result < TcpStream , std:: io:: Error > > > ;
160- let mut session_stream = None :: < TcpStream > ;
161-
162147 loop {
163148 match event_rx. try_recv ( ) {
164149 Ok ( event) => match event {
@@ -173,8 +158,7 @@ impl Application {
173158 ) ?;
174159
175160 let addr = self . receivers [ idx] . 1 ;
176- session_stream_jh =
177- Some ( thread:: spawn ( move || TcpStream :: connect ( addr) ) ) ;
161+ self . session . connect ( addr) ;
178162
179163 self . update_receivers_in_ui ( ) ?;
180164 } else {
@@ -185,8 +169,9 @@ impl Application {
185169 self . add_receiver ( name, addresses) ?;
186170 }
187171 Event :: DisconnectReceiver => {
188- let _ = session_stream_jh. take ( ) ;
189- let _ = session_stream. take ( ) ;
172+ if let Err ( err) = self . session . disconnect ( ) {
173+ error ! ( "Failed to disconnect from receiver: {err}" ) ;
174+ }
190175
191176 for r in & mut self . receivers {
192177 if r. 0 . state == ReceiverState :: Connected
@@ -204,9 +189,14 @@ impl Application {
204189 ) ?;
205190 }
206191 Event :: SendPacket ( packet) => {
207- if let Some ( stream ) = session_stream . as_mut ( ) {
192+ if self . session . is_connected ( ) {
208193 self . push_message ( MessageDirection :: Out , format ! ( "{packet:?}" ) ) ?;
209- stream. write_all ( & packet. encode ( ) ) ?;
194+ if let Err ( err) = self . session . send_packet ( packet) {
195+ error ! ( "Failed to send packet: {err}" ) ;
196+ if let Err ( err) = self . session . disconnect ( ) {
197+ error ! ( "Failed to disconnect from receiver: {err}" ) ;
198+ }
199+ }
210200 }
211201 }
212202 } ,
@@ -252,9 +242,22 @@ impl Application {
252242 }
253243 }
254244
255- if let Some ( jh) = session_stream_jh. take_if ( |jh| jh. is_finished ( ) ) {
256- match jh. join ( ) . unwrap ( ) {
257- Ok ( stream) => {
245+ match self . session . poll_event ( ) {
246+ Ok ( Some ( event) ) => match event {
247+ SessionEvent :: Packet ( packet) => {
248+ self . push_message ( MessageDirection :: In , format ! ( "{packet:?}" ) ) ?;
249+ if packet == Packet :: Ping {
250+ let packet = Packet :: Pong ;
251+ self . push_message ( MessageDirection :: Out , format ! ( "{packet:?}" ) ) ?;
252+ if let Err ( err) = self . session . send_packet ( packet) {
253+ error ! ( "Failed to send packet: {err}" ) ;
254+ if let Err ( err) = self . session . disconnect ( ) {
255+ error ! ( "Failed to disconnect from receiver: {err}" ) ;
256+ }
257+ }
258+ }
259+ }
260+ SessionEvent :: Connected => {
258261 debug ! ( "Successfully connected to receiver" ) ;
259262
260263 self . push_message (
@@ -269,60 +272,14 @@ impl Application {
269272 }
270273 }
271274
272- // self.ui_weak.upgrade_in_event_loop(|ui| {
273- // ui.invoke_receiver_connected();
274- // })?;
275-
276275 self . update_receivers_in_ui ( ) ?;
277-
278- stream. set_nonblocking ( true ) ?;
279- session_stream = Some ( stream) ;
280- }
281- Err ( err) => {
282- error ! ( "Failed to connect to receiver: {err}" ) ;
283- self . push_message (
284- MessageDirection :: Info ,
285- format ! ( "Failed to connect to receiver: {err}" ) ,
286- ) ?;
287- }
288- }
289- }
290-
291- ' out: {
292- if let Some ( stream) = session_stream. as_mut ( ) {
293- let mut header_buf = [ 0u8 ; 5 ] ;
294- if let Err ( err) = stream. read_exact ( & mut header_buf) {
295- if err. kind ( ) != std:: io:: ErrorKind :: WouldBlock {
296- return Err ( err. into ( ) ) ;
297- }
298- break ' out;
299- }
300-
301- let header = fcast_lib:: models:: Header :: decode ( header_buf) ;
302-
303- let mut body_string = String :: new ( ) ;
304-
305- if header. size > 0 {
306- let mut body_buf = vec ! [ 0 ; header. size as usize ] ;
307- // TODO: this can fail badly, need to read the whole packet
308- if let Err ( err) = stream. read_exact ( & mut body_buf) {
309- if err. kind ( ) != std:: io:: ErrorKind :: WouldBlock {
310- return Err ( err. into ( ) ) ;
311- }
312- error ! ( "Failed to read from receiver (cuz nonblocking)" ) ;
313- break ' out;
314- }
315- body_string = String :: from_utf8 ( body_buf) ?;
316- }
317-
318- let packet = Packet :: decode ( header, & body_string) ?;
319- self . push_message ( MessageDirection :: In , format ! ( "{packet:?}" ) ) ?;
320- if packet == Packet :: Ping {
321- let packet = Packet :: Pong ;
322- self . push_message ( MessageDirection :: Out , format ! ( "{packet:?}" ) ) ?;
323- stream. write_all ( & packet. encode ( ) ) ?;
324276 }
277+ } ,
278+ Err ( err) => {
279+ error ! ( "Failed to poll session event: {err}" ) ;
280+ self . session . disconnect ( ) ?;
325281 }
282+ _ => ( ) ,
326283 }
327284
328285 sleep ( Duration :: from_millis ( 25 ) ) ;
0 commit comments