@@ -6,10 +6,12 @@ use std::{
66 Arc , Mutex ,
77 } ,
88} ;
9-
109use tokio:: sync:: { mpsc, oneshot} ;
1110use tokio_stream:: wrappers:: UnboundedReceiverStream ;
1211use tonic:: { Request , Response , Status , Streaming } ;
12+ use tracing:: Instrument ;
13+
14+ use defguard_version:: { version_info_from_metadata, DefguardComponent } ;
1315
1416use crate :: {
1517 error:: ApiError ,
@@ -89,7 +91,7 @@ impl proxy_server::Proxy for ProxyServer {
8991 type BidiStream = UnboundedReceiverStream < Result < CoreRequest , Status > > ;
9092
9193 /// Handle bidirectional communication with Defguard core.
92- #[ instrument( name = "bidirectional_communication" , level = "debug " , skip( self ) ) ]
94+ #[ instrument( name = "bidirectional_communication" , level = "info " , skip( self ) ) ]
9395 async fn bidi (
9496 & self ,
9597 request : Request < Streaming < CoreResponse > > ,
@@ -98,6 +100,9 @@ impl proxy_server::Proxy for ProxyServer {
98100 error ! ( "Failed to determine client address for request: {request:?}" ) ;
99101 return Err ( Status :: internal ( "Failed to determine client address" ) ) ;
100102 } ;
103+ let ( version, info) = version_info_from_metadata ( request. metadata ( ) ) ;
104+ let span = tracing:: info_span!( "core_bidi_stream" , component = %DefguardComponent :: Core , version, info) ;
105+ let _guard = span. enter ( ) ;
101106 info ! ( "Defguard Core gRPC client connected from: {address}" ) ;
102107
103108 let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
@@ -108,37 +113,40 @@ impl proxy_server::Proxy for ProxyServer {
108113 let results = Arc :: clone ( & self . results ) ;
109114 let connected = Arc :: clone ( & self . connected ) ;
110115 let mut stream = request. into_inner ( ) ;
111- tokio:: spawn ( async move {
112- loop {
113- match stream. message ( ) . await {
114- Ok ( Some ( response) ) => {
115- debug ! ( "Received message from Defguard core: {response:?}" ) ;
116- connected. store ( true , Ordering :: Relaxed ) ;
117- // Discard empty payloads.
118- if let Some ( payload) = response. payload {
119- if let Some ( rx) = results. lock ( ) . unwrap ( ) . remove ( & response. id ) {
120- if let Err ( err) = rx. send ( payload) {
121- error ! ( "Failed to send message to rx: {err:?}" ) ;
116+ tokio:: spawn (
117+ async move {
118+ loop {
119+ match stream. message ( ) . await {
120+ Ok ( Some ( response) ) => {
121+ debug ! ( "Received message from Defguard core: {response:?}" ) ;
122+ connected. store ( true , Ordering :: Relaxed ) ;
123+ // Discard empty payloads.
124+ if let Some ( payload) = response. payload {
125+ if let Some ( rx) = results. lock ( ) . unwrap ( ) . remove ( & response. id ) {
126+ if let Err ( err) = rx. send ( payload) {
127+ error ! ( "Failed to send message to rx: {err:?}" ) ;
128+ }
129+ } else {
130+ error ! ( "Missing receiver for response #{}" , response. id) ;
122131 }
123- } else {
124- error ! ( "Missing receiver for response #{}" , response. id) ;
125132 }
126133 }
127- }
128- Ok ( None ) => {
129- info ! ( "gRPC stream has been closed" ) ;
130- break ;
131- }
132- Err ( err ) => {
133- error ! ( "gRPC client error: {err}" ) ;
134- break ;
134+ Ok ( None ) => {
135+ info ! ( "gRPC stream has been closed" ) ;
136+ break ;
137+ }
138+ Err ( err ) => {
139+ error ! ( "gRPC client error: {err}" ) ;
140+ break ;
141+ }
135142 }
136143 }
144+ info ! ( "Defguard core client disconnected: {address}" ) ;
145+ connected. store ( false , Ordering :: Relaxed ) ;
146+ clients. lock ( ) . unwrap ( ) . remove ( & address) ;
137147 }
138- info ! ( "Defguard core client disconnected: {address}" ) ;
139- connected. store ( false , Ordering :: Relaxed ) ;
140- clients. lock ( ) . unwrap ( ) . remove ( & address) ;
141- } ) ;
148+ . instrument ( tracing:: Span :: current ( ) ) ,
149+ ) ;
142150
143151 Ok ( Response :: new ( UnboundedReceiverStream :: new ( rx) ) )
144152 }
0 commit comments