1- use std:: { sync:: Arc , time:: { Duration , SystemTime } } ;
1+ use std:: { collections :: HashMap , convert :: Infallible , marker :: PhantomData , sync:: Arc , time:: { Duration , SystemTime } } ;
22
3- use axum:: { extract:: { State , Path } , http:: StatusCode , routing :: get , Json , Router , response :: Response } ;
3+ use axum:: { extract:: { Path , State } , http:: StatusCode , response :: { sse :: { Event , KeepAlive } , Response , Sse } , routing :: get , Json , Router } ;
44use axum_extra:: { headers:: { authorization:: Basic , Authorization } , TypedHeader } ;
55use beam_lib:: ProxyId ;
6+ use futures_core:: Stream ;
67use serde:: { Serialize , Deserialize } ;
78use shared:: { crypto_jwt:: Authorized , Msg , config:: CONFIG_CENTRAL } ;
8- use tokio:: sync:: RwLock ;
9+ use tokio:: sync:: { Mutex , OwnedMutexGuard , RwLock } ;
910
10- use crate :: { health :: { Health , VaultStatus , Verdict , ProxyStatus , InitStatus } , compare_client_server_version:: log_version_mismatch} ;
11+ use crate :: compare_client_server_version:: log_version_mismatch;
1112
1213#[ derive( Serialize ) ]
1314struct HealthOutput {
@@ -16,6 +17,58 @@ struct HealthOutput {
1617 init_status : InitStatus
1718}
1819
20+ #[ derive( Serialize ) ]
21+ #[ serde( rename_all = "lowercase" ) ]
22+ pub enum Verdict {
23+ Healthy ,
24+ Unhealthy ,
25+ Unknown ,
26+ }
27+
28+ impl Default for Verdict {
29+ fn default ( ) -> Self {
30+ Verdict :: Unknown
31+ }
32+ }
33+
34+ #[ derive( Debug , Serialize , Clone , Copy , Default ) ]
35+ #[ serde( rename_all = "lowercase" ) ]
36+ pub enum VaultStatus {
37+ Ok ,
38+ #[ default]
39+ Unknown ,
40+ OtherError ,
41+ LockedOrSealed ,
42+ Unreachable ,
43+ }
44+
45+ #[ derive( Debug , Serialize , Clone , Copy , Default ) ]
46+ #[ serde( rename_all = "lowercase" ) ]
47+ pub enum InitStatus {
48+ #[ default]
49+ Unknown ,
50+ FetchingIntermediateCert ,
51+ Done
52+ }
53+
54+ #[ derive( Debug , Default ) ]
55+ pub struct Health {
56+ pub vault : VaultStatus ,
57+ pub initstatus : InitStatus ,
58+ proxies : HashMap < ProxyId , ProxyStatus >
59+ }
60+
61+ #[ derive( Debug , Clone , Default ) ]
62+ struct ProxyStatus {
63+ online_guard : Arc < Mutex < Option < SystemTime > > >
64+ }
65+
66+ impl ProxyStatus {
67+ pub fn is_online ( & self ) -> bool {
68+ self . online_guard . try_lock ( ) . is_err ( )
69+ }
70+ }
71+
1972pub ( crate ) fn router ( health : Arc < RwLock < Health > > ) -> Router {
2073 Router :: new ( )
2174 . route ( "/v1/health" , get ( handler) )
@@ -46,14 +99,14 @@ async fn handler(
4699}
47100
48101async fn get_all_proxies ( State ( state) : State < Arc < RwLock < Health > > > ) -> Json < Vec < ProxyId > > {
49- Json ( state. read ( ) . await . proxies . keys ( ) . cloned ( ) . collect ( ) )
102+ Json ( state. read ( ) . await . proxies . iter ( ) . filter ( | ( _ , v ) | v . is_online ( ) ) . map ( | ( k , _ ) | k ) . cloned ( ) . collect ( ) )
50103}
51104
52105async fn proxy_health (
53106 State ( state) : State < Arc < RwLock < Health > > > ,
54107 Path ( proxy) : Path < ProxyId > ,
55108 auth : TypedHeader < Authorization < Basic > >
56- ) -> Result < ( StatusCode , Json < ProxyStatus > ) , StatusCode > {
109+ ) -> Result < ( StatusCode , Json < serde_json :: Value > ) , StatusCode > {
57110 let Some ( ref monitoring_key) = CONFIG_CENTRAL . monitoring_api_key else {
58111 return Err ( StatusCode :: NOT_IMPLEMENTED ) ;
59112 } ;
@@ -63,10 +116,12 @@ async fn proxy_health(
63116 }
64117
65118 if let Some ( reported_back) = state. read ( ) . await . proxies . get ( & proxy) {
66- if reported_back. online ( ) {
67- Err ( StatusCode :: OK )
119+ if let Ok ( last_disconnect) = reported_back. online_guard . try_lock ( ) . as_deref ( ) . copied ( ) {
120+ Ok ( ( StatusCode :: SERVICE_UNAVAILABLE , Json ( serde_json:: json!( {
121+ "last_disconnect" : last_disconnect
122+ } ) ) ) )
68123 } else {
69- Ok ( ( StatusCode :: SERVICE_UNAVAILABLE , Json ( reported_back . clone ( ) ) ) )
124+ Err ( StatusCode :: OK )
70125 }
71126 } else {
72127 Err ( StatusCode :: NOT_FOUND )
@@ -76,48 +131,38 @@ async fn proxy_health(
76131async fn get_control_tasks (
77132 State ( state) : State < Arc < RwLock < Health > > > ,
78133 proxy_auth : Authorized ,
79- ) -> StatusCode {
134+ ) -> Result < Sse < ForeverStream > , StatusCode > {
80135 let proxy_id = proxy_auth. get_from ( ) . proxy_id ( ) ;
81136 // Once this is freed the connection will be removed from the map of connected proxies again
82137 // This ensures that when the connection is dropped and therefore this response future the status of this proxy will be updated
83- let _connection_remover = ConnectedGuard :: connect ( & proxy_id, & state) . await ;
84-
85- // In the future, this will wait for control tasks for the given proxy
86- tokio:: time:: sleep ( Duration :: from_secs ( 60 * 60 ) ) . await ;
138+ let status_mutex = state
139+ . write ( )
140+ . await
141+ . proxies
142+ . entry ( proxy_id)
143+ . or_default ( )
144+ . online_guard
145+ . clone ( ) ;
146+ let Ok ( connect_guard) = tokio:: time:: timeout ( Duration :: from_secs ( 60 ) , status_mutex. lock_owned ( ) ) . await
147+ else {
148+ return Err ( StatusCode :: CONFLICT ) ;
149+ } ;
87150
88- StatusCode :: OK
151+ Ok ( Sse :: new ( ForeverStream ( connect_guard ) ) . keep_alive ( KeepAlive :: new ( ) ) )
89152}
90153
91- struct ConnectedGuard < ' a > {
92- proxy : & ' a ProxyId ,
93- state : & ' a Arc < RwLock < Health > >
94- }
154+ struct ForeverStream ( OwnedMutexGuard < Option < SystemTime > > ) ;
95155
96- impl < ' a > ConnectedGuard < ' a > {
97- async fn connect ( proxy : & ' a ProxyId , state : & ' a Arc < RwLock < Health > > ) -> ConnectedGuard < ' a > {
98- {
99- state. write ( ) . await . proxies
100- . entry ( proxy. clone ( ) )
101- . and_modify ( ProxyStatus :: connect)
102- . or_insert ( ProxyStatus :: new ( ) ) ;
103- }
104- Self { proxy, state }
156+ impl Stream for ForeverStream {
157+ type Item = Result < Event , Infallible > ;
158+
159+ fn poll_next ( self : std:: pin:: Pin < & mut Self > , _cx : & mut std:: task:: Context < ' _ > ) -> std:: task:: Poll < Option < Self :: Item > > {
160+ std:: task:: Poll :: Pending
105161 }
106162}
107163
108- impl < ' a > Drop for ConnectedGuard < ' a > {
164+ impl Drop for ForeverStream {
109165 fn drop ( & mut self ) {
110- let proxy_id = self . proxy . clone ( ) ;
111- let map = self . state . clone ( ) ;
112- tokio:: spawn ( async move {
113- // We wait here for one second to give the client a bit of time to reconnect incrementing the connection count so that it will be one again after the decrement
114- tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
115- map. write ( )
116- . await
117- . proxies
118- . get_mut ( & proxy_id)
119- . expect ( "Has to exist as we don't remove items and the constructor of this type inserts the entry" )
120- . disconnect ( ) ;
121- } ) ;
166+ * self . 0 = Some ( SystemTime :: now ( ) ) ;
122167 }
123- }
168+ }
0 commit comments