@@ -12,7 +12,7 @@ use axum::{
1212 Json ,
1313} ;
1414use futures_util:: { SinkExt , StreamExt } ;
15- use metrics:: { counter, histogram} ;
15+ use metrics:: { counter, gauge , histogram} ;
1616use serde:: { Deserialize , Serialize } ;
1717use tokio:: time:: { timeout, Duration } ;
1818use tokio_tungstenite:: { connect_async, tungstenite:: Message as TungsteniteMessage } ;
@@ -356,6 +356,7 @@ pub async fn ws_proxy(
356356 Some ( k) => k,
357357 None => {
358358 info ! ( "WebSocket: No API key provided from {}" , addr) ;
359+ counter ! ( "ws_connections_total" , "backend" => "none" , "owner" => "none" , "status" => "auth_failed" ) . increment ( 1 ) ;
359360 return ( StatusCode :: UNAUTHORIZED , "Unauthorized" ) . into_response ( ) ;
360361 }
361362 } ;
@@ -367,6 +368,7 @@ pub async fn ws_proxy(
367368 }
368369 Ok ( None ) => {
369370 info ! ( "WebSocket: Invalid API key from {} (prefix={}...)" , addr, & api_key[ ..api_key. len( ) . min( 6 ) ] ) ;
371+ counter ! ( "ws_connections_total" , "backend" => "none" , "owner" => "none" , "status" => "auth_failed" ) . increment ( 1 ) ;
370372 return ( StatusCode :: UNAUTHORIZED , "Unauthorized" ) . into_response ( ) ;
371373 }
372374 Err ( e) => {
@@ -375,9 +377,11 @@ pub async fn ws_proxy(
375377 "WebSocket: API key rate limited from {} (prefix={}...)" ,
376378 addr, & api_key[ ..api_key. len( ) . min( 6 ) ]
377379 ) ;
380+ counter ! ( "ws_connections_total" , "backend" => "none" , "owner" => "none" , "status" => "rate_limited" ) . increment ( 1 ) ;
378381 return ( StatusCode :: TOO_MANY_REQUESTS , "Rate limit exceeded" ) . into_response ( ) ;
379382 }
380383 error ! ( "WebSocket: Key validation error: {}" , e) ;
384+ counter ! ( "ws_connections_total" , "backend" => "none" , "owner" => "none" , "status" => "error" ) . increment ( 1 ) ;
381385 return ( StatusCode :: INTERNAL_SERVER_ERROR , "Internal Server Error" ) . into_response ( ) ;
382386 }
383387 } ;
@@ -387,6 +391,7 @@ pub async fn ws_proxy(
387391 Some ( selection) => selection,
388392 None => {
389393 error ! ( "No healthy WebSocket backends available" ) ;
394+ counter ! ( "ws_connections_total" , "backend" => "none" , "owner" => owner. clone( ) , "status" => "no_backend" ) . increment ( 1 ) ;
390395 return (
391396 StatusCode :: SERVICE_UNAVAILABLE ,
392397 "No healthy WebSocket backends available" ,
@@ -404,7 +409,7 @@ pub async fn ws_proxy(
404409 ) ;
405410
406411 ws. on_upgrade ( move |client_socket| {
407- handle_ws_connection ( client_socket, backend_ws_url, backend_label, addr)
412+ handle_ws_connection ( client_socket, backend_ws_url, backend_label, owner , addr)
408413 } )
409414 . into_response ( )
410415}
@@ -413,6 +418,7 @@ async fn handle_ws_connection(
413418 client_socket : WebSocket ,
414419 backend_url : String ,
415420 backend_label : String ,
421+ owner : String ,
416422 client_addr : SocketAddr ,
417423) {
418424 // Connect to the backend WebSocket
@@ -423,10 +429,15 @@ async fn handle_ws_connection(
423429 "WebSocket: Failed to connect to backend {} ({}): {}" ,
424430 backend_label, backend_url, e
425431 ) ;
432+ counter ! ( "ws_connections_total" , "backend" => backend_label, "owner" => owner, "status" => "backend_connect_failed" ) . increment ( 1 ) ;
426433 return ;
427434 }
428435 } ;
429436
437+ counter ! ( "ws_connections_total" , "backend" => backend_label. clone( ) , "owner" => owner. clone( ) , "status" => "connected" ) . increment ( 1 ) ;
438+ gauge ! ( "ws_active_connections" , "backend" => backend_label. clone( ) , "owner" => owner. clone( ) ) . increment ( 1.0 ) ;
439+ let connect_time = std:: time:: Instant :: now ( ) ;
440+
430441 info ! (
431442 "WebSocket: {} connected to backend {}" ,
432443 client_addr, backend_label
@@ -436,11 +447,18 @@ async fn handle_ws_connection(
436447 let ( mut client_write, mut client_read) = client_socket. split ( ) ;
437448 let ( mut backend_write, mut backend_read) = backend_socket. split ( ) ;
438449
450+ // Clones for use inside async blocks
451+ let bl1 = backend_label. clone ( ) ;
452+ let ow1 = owner. clone ( ) ;
453+ let bl2 = backend_label. clone ( ) ;
454+ let ow2 = owner. clone ( ) ;
455+
439456 // Forward client -> backend
440457 let client_to_backend = async {
441458 while let Some ( msg) = client_read. next ( ) . await {
442459 match msg {
443460 Ok ( Message :: Text ( text) ) => {
461+ counter ! ( "ws_messages_total" , "backend" => bl1. clone( ) , "owner" => ow1. clone( ) , "direction" => "client_to_backend" ) . increment ( 1 ) ;
444462 if backend_write
445463 . send ( TungsteniteMessage :: Text ( text) )
446464 . await
@@ -450,6 +468,7 @@ async fn handle_ws_connection(
450468 }
451469 }
452470 Ok ( Message :: Binary ( data) ) => {
471+ counter ! ( "ws_messages_total" , "backend" => bl1. clone( ) , "owner" => ow1. clone( ) , "direction" => "client_to_backend" ) . increment ( 1 ) ;
453472 if backend_write
454473 . send ( TungsteniteMessage :: Binary ( data) )
455474 . await
@@ -486,11 +505,13 @@ async fn handle_ws_connection(
486505 while let Some ( msg) = backend_read. next ( ) . await {
487506 match msg {
488507 Ok ( TungsteniteMessage :: Text ( text) ) => {
508+ counter ! ( "ws_messages_total" , "backend" => bl2. clone( ) , "owner" => ow2. clone( ) , "direction" => "backend_to_client" ) . increment ( 1 ) ;
489509 if client_write. send ( Message :: Text ( text) ) . await . is_err ( ) {
490510 break ;
491511 }
492512 }
493513 Ok ( TungsteniteMessage :: Binary ( data) ) => {
514+ counter ! ( "ws_messages_total" , "backend" => bl2. clone( ) , "owner" => ow2. clone( ) , "direction" => "backend_to_client" ) . increment ( 1 ) ;
494515 if client_write. send ( Message :: Binary ( data) ) . await . is_err ( ) {
495516 break ;
496517 }
@@ -524,8 +545,12 @@ async fn handle_ws_connection(
524545 } ,
525546 }
526547
548+ let duration = connect_time. elapsed ( ) . as_secs_f64 ( ) ;
549+ gauge ! ( "ws_active_connections" , "backend" => backend_label. clone( ) , "owner" => owner. clone( ) ) . decrement ( 1.0 ) ;
550+ histogram ! ( "ws_connection_duration_seconds" , "backend" => backend_label. clone( ) , "owner" => owner. clone( ) ) . record ( duration) ;
551+
527552 info ! (
528- "WebSocket: {} disconnected from backend {}" ,
529- client_addr, backend_label
553+ "WebSocket: {} disconnected from backend {} (duration={:.1}s) " ,
554+ client_addr, backend_label, duration
530555 ) ;
531556}
0 commit comments