Skip to content

Commit 945af5b

Browse files
authored
feat: Change health connection to SSE (#209)
* feat: Change health connection to SSE * docs: document health connection
1 parent ba6bbae commit 945af5b

File tree

7 files changed

+141
-162
lines changed

7 files changed

+141
-162
lines changed

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ Date: Mon, 27 Jun 2022 14:26:45 GMT
399399

400400
As part of making this API performant, all reading endpoints support long-polling as an efficient alternative to regular (repeated) polling. Using this function requires the following parameters:
401401

402-
- `wait_count`: The API call will block until at least this many results are available. If there are more matching tasks/results avalible all of them will be returned.
402+
- `wait_count`: The API call will block until at least this many results are available. If there are more matching tasks/results available all of them will be returned.
403403
- `wait_time`: ... or this time has passed (if not stated differently, e.g., by adding 'm', 'h', 'ms', ..., this is interpreted as seconds), whichever comes first.
404404

405405
For example, retrieving a task's results:
@@ -629,6 +629,12 @@ Samply.Beam encrypts all information in the `body` fields of both Tasks and Resu
629629

630630
The data is symmetrically encrypted using the Authenticated Encryption with Authenticated Data (AEAD) algorithm "XChaCha20Poly1305", a widespread algorithm (e.g., mandatory for the TLS protocol), regarded as highly secure by experts. The used [chacha20poly1305 library](https://docs.rs/chacha20poly1305/latest/chacha20poly1305/) was sublected to a [security audit](https://research.nccgroup.com/2020/02/26/public-report-rustcrypto-aes-gcm-and-chacha20poly1305-implementation-review/), with no significant findings. The randomly generated symmetric keys are encapsulated in a RSA encrypted ciphertext using OAEP Padding. This ensures, that only the intended recipients can decrypt the key and subsequently the transferred data.
631631

632+
### Health check connection
633+
634+
The beam proxy tries to keep a permanent connection to the broker to make it possible to see which sites are currently connected.
635+
This also allows us to detected invalid connection states such as multiple proxies with the same proxy id connecting simultaneously.
636+
In that case the second proxy trying to connect will receive a 409 status code and shut down.
637+
632638
## Roadmap
633639

634640
- [X] API Key authentication of local applications

broker/src/crypto.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::time::Duration;
99
use tokio::{sync::RwLock, time::timeout};
1010
use tracing::{debug, error, warn, info};
1111

12-
use crate::health::{self, Health, VaultStatus};
12+
use crate::serve_health::{Health, VaultStatus};
1313

1414
pub struct GetCertsFromPki {
1515
pki_realm: String,

broker/src/health.rs

Lines changed: 0 additions & 85 deletions
This file was deleted.

broker/src/main.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
mod banner;
44
mod crypto;
5-
mod health;
65
mod serve;
76
mod serve_health;
87
mod serve_pki;
@@ -15,7 +14,7 @@ mod compare_client_server_version;
1514
use std::{collections::HashMap, sync::Arc, time::Duration};
1615

1716
use crypto::GetCertsFromPki;
18-
use health::{Health, InitStatus};
17+
use serve_health::{Health, InitStatus};
1918
use once_cell::sync::Lazy;
2019
use shared::{config::CONFIG_CENTRAL, *, errors::SamplyBeamError};
2120
use tokio::sync::RwLock;
@@ -45,8 +44,8 @@ pub async fn main() -> anyhow::Result<()> {
4544

4645
async fn init_broker_ca_chain(health: Arc<RwLock<Health>>) {
4746
{
48-
health.write().await.initstatus = health::InitStatus::FetchingIntermediateCert
47+
health.write().await.initstatus = InitStatus::FetchingIntermediateCert
4948
}
5049
shared::crypto::init_ca_chain().await.expect("Failed to init broker ca chain");
51-
health.write().await.initstatus = health::InitStatus::Done;
50+
health.write().await.initstatus = InitStatus::Done;
5251
}

broker/src/serve.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use tokio::{
2020
};
2121
use tracing::{debug, info, trace, warn};
2222

23-
use crate::{banner, crypto, health::Health, serve_health, serve_pki, serve_tasks, compare_client_server_version};
23+
use crate::{banner, crypto, serve_health::Health, serve_health, serve_pki, serve_tasks, compare_client_server_version};
2424

2525
pub(crate) async fn serve(health: Arc<RwLock<Health>>) -> anyhow::Result<()> {
2626
let app = serve_tasks::router()

broker/src/serve_health.rs

Lines changed: 87 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
use std::{sync::Arc, time::{Duration, SystemTime}};
1+
use std::{collections::HashMap, convert::Infallible, marker::PhantomData, sync::Arc, time::{Duration, SystemTime}};
22

3-
use axum::{extract::{State, Path}, http::StatusCode, routing::get, Json, Router, response::Response};
3+
use axum::{extract::{Path, State}, http::StatusCode, response::{sse::{Event, KeepAlive}, Response, Sse}, routing::get, Json, Router};
44
use axum_extra::{headers::{authorization::Basic, Authorization}, TypedHeader};
55
use beam_lib::ProxyId;
6+
use futures_core::Stream;
67
use serde::{Serialize, Deserialize};
78
use shared::{crypto_jwt::Authorized, Msg, config::CONFIG_CENTRAL};
8-
use tokio::sync::RwLock;
9+
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
910

10-
use crate::{health::{Health, VaultStatus, Verdict, ProxyStatus, InitStatus}, compare_client_server_version::log_version_mismatch};
11+
use crate::compare_client_server_version::log_version_mismatch;
1112

1213
#[derive(Serialize)]
1314
struct HealthOutput {
@@ -16,6 +17,58 @@ struct HealthOutput {
1617
init_status: InitStatus
1718
}
1819

20+
#[derive(Serialize)]
21+
#[serde(rename_all = "lowercase")]
22+
pub enum Verdict {
23+
Healthy,
24+
Unhealthy,
25+
Unknown,
26+
}
27+
28+
impl Default for Verdict {
29+
fn default() -> Self {
30+
Verdict::Unknown
31+
}
32+
}
33+
34+
#[derive(Debug, Serialize, Clone, Copy, Default)]
35+
#[serde(rename_all = "lowercase")]
36+
pub enum VaultStatus {
37+
Ok,
38+
#[default]
39+
Unknown,
40+
OtherError,
41+
LockedOrSealed,
42+
Unreachable,
43+
}
44+
45+
#[derive(Debug, Serialize, Clone, Copy, Default)]
46+
#[serde(rename_all = "lowercase")]
47+
pub enum InitStatus {
48+
#[default]
49+
Unknown,
50+
FetchingIntermediateCert,
51+
Done
52+
}
53+
54+
#[derive(Debug, Default)]
55+
pub struct Health {
56+
pub vault: VaultStatus,
57+
pub initstatus: InitStatus,
58+
proxies: HashMap<ProxyId, ProxyStatus>
59+
}
60+
61+
#[derive(Debug, Clone, Default)]
62+
struct ProxyStatus {
63+
online_guard: Arc<Mutex<Option<SystemTime>>>
64+
}
65+
66+
impl ProxyStatus {
67+
pub fn is_online(&self) -> bool {
68+
self.online_guard.try_lock().is_err()
69+
}
70+
}
71+
1972
pub(crate) fn router(health: Arc<RwLock<Health>>) -> Router {
2073
Router::new()
2174
.route("/v1/health", get(handler))
@@ -46,14 +99,14 @@ async fn handler(
4699
}
47100

48101
async fn get_all_proxies(State(state): State<Arc<RwLock<Health>>>) -> Json<Vec<ProxyId>> {
49-
Json(state.read().await.proxies.keys().cloned().collect())
102+
Json(state.read().await.proxies.iter().filter(|(_, v)| v.is_online()).map(|(k, _)| k).cloned().collect())
50103
}
51104

52105
async fn proxy_health(
53106
State(state): State<Arc<RwLock<Health>>>,
54107
Path(proxy): Path<ProxyId>,
55108
auth: TypedHeader<Authorization<Basic>>
56-
) -> Result<(StatusCode, Json<ProxyStatus>), StatusCode> {
109+
) -> Result<(StatusCode, Json<serde_json::Value>), StatusCode> {
57110
let Some(ref monitoring_key) = CONFIG_CENTRAL.monitoring_api_key else {
58111
return Err(StatusCode::NOT_IMPLEMENTED);
59112
};
@@ -63,10 +116,12 @@ async fn proxy_health(
63116
}
64117

65118
if let Some(reported_back) = state.read().await.proxies.get(&proxy) {
66-
if reported_back.online() {
67-
Err(StatusCode::OK)
119+
if let Ok(last_disconnect) = reported_back.online_guard.try_lock().as_deref().copied() {
120+
Ok((StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({
121+
"last_disconnect": last_disconnect
122+
}))))
68123
} else {
69-
Ok((StatusCode::SERVICE_UNAVAILABLE, Json(reported_back.clone())))
124+
Err(StatusCode::OK)
70125
}
71126
} else {
72127
Err(StatusCode::NOT_FOUND)
@@ -76,48 +131,38 @@ async fn proxy_health(
76131
async fn get_control_tasks(
77132
State(state): State<Arc<RwLock<Health>>>,
78133
proxy_auth: Authorized,
79-
) -> StatusCode {
134+
) -> Result<Sse<ForeverStream>, StatusCode> {
80135
let proxy_id = proxy_auth.get_from().proxy_id();
81136
// Once this is freed the connection will be removed from the map of connected proxies again
82137
// This ensures that when the connection is dropped and therefore this response future the status of this proxy will be updated
83-
let _connection_remover = ConnectedGuard::connect(&proxy_id, &state).await;
84-
85-
// In the future, this will wait for control tasks for the given proxy
86-
tokio::time::sleep(Duration::from_secs(60 * 60)).await;
138+
let status_mutex = state
139+
.write()
140+
.await
141+
.proxies
142+
.entry(proxy_id)
143+
.or_default()
144+
.online_guard
145+
.clone();
146+
let Ok(connect_guard) = tokio::time::timeout(Duration::from_secs(60), status_mutex.lock_owned()).await
147+
else {
148+
return Err(StatusCode::CONFLICT);
149+
};
87150

88-
StatusCode::OK
151+
Ok(Sse::new(ForeverStream(connect_guard)).keep_alive(KeepAlive::new()))
89152
}
90153

91-
struct ConnectedGuard<'a> {
92-
proxy: &'a ProxyId,
93-
state: &'a Arc<RwLock<Health>>
94-
}
154+
struct ForeverStream(OwnedMutexGuard<Option<SystemTime>>);
95155

96-
impl<'a> ConnectedGuard<'a> {
97-
async fn connect(proxy: &'a ProxyId, state: &'a Arc<RwLock<Health>>) -> ConnectedGuard<'a> {
98-
{
99-
state.write().await.proxies
100-
.entry(proxy.clone())
101-
.and_modify(ProxyStatus::connect)
102-
.or_insert(ProxyStatus::new());
103-
}
104-
Self { proxy, state }
156+
impl Stream for ForeverStream {
157+
type Item = Result<Event, Infallible>;
158+
159+
fn poll_next(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
160+
std::task::Poll::Pending
105161
}
106162
}
107163

108-
impl<'a> Drop for ConnectedGuard<'a> {
164+
impl Drop for ForeverStream {
109165
fn drop(&mut self) {
110-
let proxy_id = self.proxy.clone();
111-
let map = self.state.clone();
112-
tokio::spawn(async move {
113-
// We wait here for one second to give the client a bit of time to reconnect incrementing the connection count so that it will be one again after the decrement
114-
tokio::time::sleep(Duration::from_secs(1)).await;
115-
map.write()
116-
.await
117-
.proxies
118-
.get_mut(&proxy_id)
119-
.expect("Has to exist as we don't remove items and the constructor of this type inserts the entry")
120-
.disconnect();
121-
});
166+
*self.0 = Some(SystemTime::now());
122167
}
123-
}
168+
}

0 commit comments

Comments
 (0)