1+ use core:: panic;
12use std:: fmt:: Debug ;
23
4+ use crate :: dbus:: CredentialRequest ;
35use async_stream:: stream;
46use futures_lite:: Stream ;
7+ use tokio:: sync:: broadcast;
8+ use tokio:: sync:: mpsc:: { self , Receiver , Sender } ;
9+ use tracing:: { debug, error} ;
10+
11+ use libwebauthn:: transport:: cable:: channel:: { CableUpdate , CableUxUpdate } ;
512use libwebauthn:: transport:: cable:: qr_code_device:: { CableQrCodeDevice , QrCodeOperationHint } ;
6- use libwebauthn:: transport:: Device ;
13+ use libwebauthn:: transport:: { Channel , Device } ;
714use libwebauthn:: webauthn:: { Error as WebAuthnError , WebAuthn } ;
815
9- use crate :: dbus:: CredentialRequest ;
10-
1116use super :: { AuthenticatorResponse , Error } ;
1217
1318pub ( crate ) trait HybridHandler {
@@ -32,7 +37,7 @@ impl HybridHandler for InternalHybridHandler {
3237 ) -> impl Stream < Item = HybridEvent > + Unpin + Send + Sized + ' static {
3338 tracing:: debug!( "Starting hybrid operation" ) ;
3439 let request = request. clone ( ) ;
35- let ( tx, rx) = async_std :: channel:: unbounded ( ) ;
40+ let ( tx, mut rx) = mpsc :: channel ( 16 ) ;
3641 tokio:: spawn ( async move {
3742 let hint = match request {
3843 CredentialRequest :: CreatePublicKeyCredentialRequest ( _) => {
@@ -56,9 +61,14 @@ impl HybridHandler for InternalHybridHandler {
5661 panic ! ( ) ;
5762 }
5863 } ;
59- if let Err ( err) = tx. send ( HybridStateInternal :: Connected ) . await {
60- tracing:: error!( "Failed to send caBLE update: {:?}" , err)
61- }
64+
65+ let state_sender_clone = tx. clone ( ) ;
66+ let ux_updates_rx = channel. get_ux_update_receiver ( ) ;
67+ tokio:: spawn ( async move {
68+ handle_hybrid_updates ( & state_sender_clone, ux_updates_rx) . await ;
69+ debug ! ( "Reached end of Hybrid updates stream." ) ;
70+ } ) ;
71+
6272 tracing:: debug!( "Polling hybrid channel for updates." ) ;
6373 let response: Result < AuthenticatorResponse , Error > = loop {
6474 match & request {
@@ -122,7 +132,7 @@ impl HybridHandler for InternalHybridHandler {
122132 } ) ;
123133 } ) ;
124134 Box :: pin ( stream ! {
125- while let Ok ( state) = rx. recv( ) . await {
135+ while let Some ( state) = rx. recv( ) . await {
126136 yield HybridEvent { state }
127137 }
128138 } )
@@ -194,6 +204,39 @@ impl From<HybridStateInternal> for HybridState {
194204 }
195205}
196206
207+ async fn handle_hybrid_updates (
208+ state_sender : & Sender < HybridStateInternal > ,
209+ mut ux_update_receiver : broadcast:: Receiver < CableUxUpdate > ,
210+ ) {
211+ while let Ok ( msg) = ux_update_receiver. recv ( ) . await {
212+ debug ! ( ?msg, "Received hybrid update" ) ;
213+ let new_state: Option < HybridStateInternal > = match msg {
214+ CableUxUpdate :: UvUpdate ( uv_update) => {
215+ error ! (
216+ "Received unexpected UV update in hybrid handler: {:?}" ,
217+ uv_update
218+ ) ;
219+ None
220+ }
221+ CableUxUpdate :: CableUpdate ( cable_update) => match cable_update {
222+ CableUpdate :: ProximityCheck => None ,
223+ CableUpdate :: Connecting => Some ( HybridStateInternal :: Connecting ) ,
224+ CableUpdate :: Authenticating => Some ( HybridStateInternal :: Connecting ) ,
225+ CableUpdate :: Connected => Some ( HybridStateInternal :: Connected ) ,
226+ CableUpdate :: Error ( transport_error) => {
227+ error ! ( ?transport_error, "Hybrid transport error" ) ;
228+ Some ( HybridStateInternal :: Failed )
229+ }
230+ } ,
231+ } ;
232+ if let Some ( state) = new_state {
233+ if let Err ( err) = state_sender. send ( state. clone ( ) ) . await {
234+ error ! ( { ?err, ?state } , "Failed to send hybrid update" ) ;
235+ }
236+ }
237+ }
238+ }
239+
197240#[ cfg( test) ]
198241pub ( super ) mod test {
199242 use std:: task:: Poll ;
0 commit comments