@@ -2,7 +2,7 @@ use std::fmt::{Display, Formatter};
22use std:: time:: Duration ;
33
44use async_trait:: async_trait;
5- use tokio:: sync:: mpsc;
5+ use tokio:: sync:: { mpsc, watch } ;
66use tokio:: { task, time} ;
77use tracing:: error;
88
@@ -21,6 +21,16 @@ use crate::UvUpdate;
2121use super :: known_devices:: CableKnownDevice ;
2222use super :: qr_code_device:: CableQrCodeDevice ;
2323
24+ #[ derive( Debug , Clone , PartialEq ) ]
25+ pub enum ConnectionState {
26+ /// Connection is being established (proximity check, connecting, authenticating)
27+ Connecting ,
28+ /// Connection is fully established and ready for operations
29+ Connected ,
30+ /// Connection has terminated
31+ Terminated ,
32+ }
33+
2434#[ derive( Debug ) ]
2535pub enum CableChannelDevice < ' d > {
2636 QrCode ( & ' d CableQrCodeDevice ) ,
@@ -34,12 +44,42 @@ pub struct CableChannel {
3444
3545 /// The noise state used for encryption over the WebSocket stream.
3646 // pub(crate) noise_state: TransportState,
37-
38- /// The device that this channel is connected to.
3947 pub ( crate ) handle_connection : task:: JoinHandle < ( ) > ,
4048 pub ( crate ) cbor_sender : mpsc:: Sender < CborRequest > ,
4149 pub ( crate ) cbor_receiver : mpsc:: Receiver < CborResponse > ,
4250 pub ( crate ) tx : mpsc:: Sender < CableUxUpdate > ,
51+ /// Watch receiver for connection state
52+ pub ( crate ) connection_state_rx : watch:: Receiver < ConnectionState > ,
53+ }
54+
55+ impl CableChannel {
56+ async fn wait_for_connection ( & self ) -> Result < ( ) , Error > {
57+ let mut rx = self . connection_state_rx . clone ( ) ;
58+
59+ // If already connected, return immediately
60+ if * rx. borrow ( ) == ConnectionState :: Connected {
61+ return Ok ( ( ) ) ;
62+ }
63+
64+ // If already terminated, return error immediately
65+ if * rx. borrow ( ) == ConnectionState :: Terminated {
66+ return Err ( Error :: Transport ( TransportError :: ConnectionFailed ) ) ;
67+ }
68+
69+ // Wait for state change
70+ while rx. changed ( ) . await . is_ok ( ) {
71+ match * rx. borrow ( ) {
72+ ConnectionState :: Connected => return Ok ( ( ) ) ,
73+ ConnectionState :: Terminated => {
74+ return Err ( Error :: Transport ( TransportError :: ConnectionFailed ) )
75+ }
76+ ConnectionState :: Connecting => continue ,
77+ }
78+ }
79+
80+ // If the sender was dropped, consider it a failure
81+ Err ( Error :: Transport ( TransportError :: ConnectionFailed ) )
82+ }
4383}
4484
4585impl Display for CableChannel {
@@ -57,6 +97,21 @@ impl Drop for CableChannel {
5797#[ derive( Debug ) ]
5898pub enum CableUxUpdate {
5999 UvUpdate ( UvUpdate ) ,
100+ CableUpdate ( CableUpdate ) ,
101+ }
102+
103+ #[ derive( Debug ) ]
104+ pub enum CableUpdate {
105+ /// Waiting for proximity check user interaction (eg. scan a QR code, or confirm on the device).
106+ ProximityCheck ,
107+ /// Connecting to the tunnel server.
108+ Connecting ,
109+ /// Connected to the tunnel server, authenticating the channel.
110+ Authenticating ,
111+ /// Connected to the authenticator device via the tunnel server.
112+ Connected ,
113+ /// The connection to the authenticator device has failed.
114+ Failed ,
60115}
61116
62117impl From < UvUpdate > for CableUxUpdate {
@@ -95,6 +150,10 @@ impl<'d> Channel for CableChannel {
95150 }
96151
97152 async fn cbor_send ( & mut self , request : & CborRequest , timeout : Duration ) -> Result < ( ) , Error > {
153+ // First, wait for connection to be established (no timeout for handshake)
154+ self . wait_for_connection ( ) . await ?;
155+
156+ // Now apply timeout only to the actual CBOR operation
98157 match time:: timeout ( timeout, self . cbor_sender . send ( request. clone ( ) ) ) . await {
99158 Ok ( Ok ( _) ) => Ok ( ( ) ) ,
100159 Ok ( Err ( error) ) => {
@@ -109,6 +168,10 @@ impl<'d> Channel for CableChannel {
109168 }
110169
111170 async fn cbor_recv ( & mut self , timeout : Duration ) -> Result < CborResponse , Error > {
171+ // First, wait for connection to be established (no timeout for handshake)
172+ self . wait_for_connection ( ) . await ?;
173+
174+ // Now apply timeout only to the actual CBOR operation
112175 match time:: timeout ( timeout, self . cbor_receiver . recv ( ) ) . await {
113176 Ok ( Some ( response) ) => Ok ( response) ,
114177 Ok ( None ) => Err ( Error :: Transport ( TransportError :: TransportUnavailable ) ) ,
0 commit comments