Skip to content

Commit de88d37

Browse files
authored
Add health endpoint that returns commit lag and gaps (#423)
* Add a simple lag tracker for health checks Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com> * Use rolling-window bucket histogram so that track lag over a longer period without increasing space usage. * reduce window chunks Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com> * track queue size too Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com> * fix up comment Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com> --------- Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com>
1 parent f8d801d commit de88d37

File tree

10 files changed

+475
-5
lines changed

10 files changed

+475
-5
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/corro-agent/src/agent/handlers.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,7 @@ pub async fn handle_changes(
631631
));
632632

633633
let max_seen_cache_len: usize = max_queue_len;
634+
let metrics_tracker = agent.metrics_tracker();
634635

635636
// unlikely, but max_seen_cache_len can be less than 10, in that case we want to just clear the whole cache
636637
// (todo): put some validation in config instead
@@ -697,11 +698,11 @@ pub async fn handle_changes(
697698

698699
_ = max_wait.tick() => {
699700
// got a wait interval tick...
700-
701701
gauge!("corro.agent.changes.in_queue").set(buf_cost as f64);
702702
gauge!("corro.agent.changesets.in_queue").set(queue.len() as f64);
703703
gauge!("corro.agent.changes.processing.jobs").set(join_set.len() as f64);
704704

705+
metrics_tracker.observe_queue_size(queue.len() as u64);
705706
if buf_cost < max_changes_chunk && !queue.is_empty() && join_set.len() < MAX_CONCURRENT {
706707
// we can process this right away
707708
debug!(%buf_cost, "spawning processing multiple changes from max wait interval");

crates/corro-agent/src/agent/setup.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use corro_types::{
4242
channel::{bounded, CorroReceiver},
4343
config::Config,
4444
members::Members,
45+
metrics_tracker::MetricsTracker,
4546
pubsub::{Matcher, SubsManager},
4647
schema::{init_schema, Schema},
4748
sqlite::CrConn,
@@ -243,6 +244,8 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
243244
}
244245
});
245246

247+
let metrics_tracker = MetricsTracker::new(Duration::from_secs(120), 5)?;
248+
246249
let opts = AgentOptions {
247250
gossip_server_endpoint,
248251
transport: transport.clone(),
@@ -280,6 +283,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
280283
cluster_id,
281284
subs_manager,
282285
updates_manager,
286+
metrics_tracker,
283287
tripwire,
284288
});
285289

crates/corro-agent/src/agent/util.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
use crate::{
99
agent::{handlers, CountedExecutor, TO_CLEAR_COUNT},
1010
api::public::{
11-
api_v1_db_schema, api_v1_queries, api_v1_table_stats, api_v1_transactions,
11+
api_v1_db_schema, api_v1_health, api_v1_queries, api_v1_table_stats, api_v1_transactions,
1212
pubsub::{api_v1_sub_by_id, api_v1_subs},
1313
update::SharedUpdateBroadcastCache,
1414
},
@@ -285,6 +285,20 @@ pub async fn setup_http_api_handler(
285285
.layer(ConcurrencyLimitLayer::new(4)),
286286
),
287287
)
288+
.route(
289+
"/v1/health",
290+
get(api_v1_health).route_layer(
291+
tower::ServiceBuilder::new()
292+
.layer(HandleErrorLayer::new(|_error: BoxError| async {
293+
Ok::<_, Infallible>((
294+
StatusCode::SERVICE_UNAVAILABLE,
295+
"max concurrency limit reached".to_string(),
296+
))
297+
}))
298+
.layer(LoadShedLayer::new())
299+
.layer(ConcurrencyLimitLayer::new(4)),
300+
),
301+
)
288302
.layer(axum::middleware::from_fn(require_authz))
289303
.layer(
290304
tower::ServiceBuilder::new()
@@ -1017,6 +1031,7 @@ pub async fn process_multiple_changes(
10171031
if let Some(ts) = changeset.ts() {
10181032
let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration();
10191033
histogram!("corro.agent.changes.commit.lag.seconds").record(dur);
1034+
agent.metrics_tracker().observe_lag(dur.as_secs_f64());
10201035
}
10211036
}
10221037

crates/corro-agent/src/api/public/mod.rs

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,18 @@ use std::{
77

88
use crate::api::utils::CountedBody;
99
use antithesis_sdk::assert_sometimes;
10-
use axum::{extract::ConnectInfo, response::IntoResponse, Extension};
10+
use axum::{
11+
extract::{ConnectInfo, Query},
12+
response::IntoResponse,
13+
Extension,
14+
};
1115
use bytes::{BufMut, BytesMut};
1216
use compact_str::ToCompactString;
1317
use corro_types::{
1418
agent::{Agent, ChangeError},
1519
api::{
16-
ColumnName, ExecResponse, ExecResult, QueryEvent, Statement, TableStatRequest,
17-
TableStatResponse,
20+
ColumnName, ExecResponse, ExecResult, HealthQuery, HealthResponse, QueryEvent, Statement,
21+
TableStatRequest, TableStatResponse,
1822
},
1923
base::CrsqlDbVersion,
2024
broadcast::Timestamp,
@@ -643,6 +647,64 @@ pub async fn api_v1_db_schema(
643647
)
644648
}
645649

650+
pub async fn api_v1_health(
651+
Extension(agent): Extension<Agent>,
652+
Query(query): Query<HealthQuery>,
653+
) -> (StatusCode, axum::Json<HealthResponse>) {
654+
match check_health(&agent).await {
655+
Ok((gaps, members)) => {
656+
let p99_lag = agent.metrics_tracker().quantile_lag(0.99).unwrap_or(0.0);
657+
let queue_size = agent.metrics_tracker().queue_size();
658+
let status = if query.gaps.is_some_and(|max| gaps > max)
659+
// we use queue size and p99 lag as a stronger metric for an unhealthy node
660+
// since a different node that is slow to send out changes can cause worse commit lag
661+
// even though the node is perfectly fine.
662+
|| (query.p99_lag.is_some_and(|max| p99_lag > max)
663+
&& query.queue_size.is_some_and(|max| queue_size > max))
664+
{
665+
StatusCode::SERVICE_UNAVAILABLE
666+
} else {
667+
StatusCode::OK
668+
};
669+
(
670+
status,
671+
axum::Json(HealthResponse::Response {
672+
gaps,
673+
members,
674+
p99_lag,
675+
queue_size,
676+
}),
677+
)
678+
}
679+
Err(e) => {
680+
error!("could not check health: {e}");
681+
(
682+
StatusCode::INTERNAL_SERVER_ERROR,
683+
axum::Json(HealthResponse::Error(e.to_string())),
684+
)
685+
}
686+
}
687+
}
688+
689+
async fn check_health(agent: &Agent) -> eyre::Result<(i64, i64)> {
690+
let read_conn = match agent.pool().read().await {
691+
Ok(conn) => conn,
692+
Err(e) => {
693+
error!("could not acquire read connection for health check: {e}");
694+
return Err(eyre::eyre!("unable to grab write conn"));
695+
}
696+
};
697+
698+
let gaps = read_conn
699+
.prepare_cached("SELECT COALESCE(SUM(end - start + 1), 0) FROM __corro_bookkeeping_gaps")?
700+
.query_row([], |row| row.get::<_, i64>(0))?;
701+
702+
let members = read_conn.prepare_cached(r#"
703+
SELECT COALESCE(COUNT(*), 0) FROM __corro_members WHERE json_extract(foca_state, "$.state") = "Alive""#)?
704+
.query_row([], |row| row.get::<_, i64>(0))?;
705+
706+
Ok((gaps, members))
707+
}
646708
/// Query the table status of the current node
647709
///
648710
/// Currently this endpoint only supports querying the row count for a

crates/corro-api-types/src/lib.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,26 @@ pub struct TableStatResponse {
241241
pub invalid_tables: Vec<String>,
242242
}
243243

244+
#[derive(Debug, Serialize, Deserialize)]
245+
pub struct HealthQuery {
246+
pub gaps: Option<i64>,
247+
pub p99_lag: Option<f64>,
248+
pub queue_size: Option<u64>,
249+
}
250+
251+
/// Contains status information about the node
252+
#[derive(Debug, Serialize, Deserialize)]
253+
#[serde(rename_all = "lowercase")]
254+
pub enum HealthResponse {
255+
Response {
256+
gaps: i64,
257+
members: i64,
258+
p99_lag: f64,
259+
queue_size: u64,
260+
},
261+
Error(String),
262+
}
263+
244264
#[derive(Debug, Copy, Clone, PartialEq)]
245265
pub struct SqliteValueRef<'a>(pub ValueRef<'a>);
246266

crates/corro-types/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ corro-api-types = { version = "0.1.0-alpha.1", path = "../corro-api-types" }
2323
corro-base-types = { version = "0.1.0-alpha.1", path = "../corro-base-types" }
2424
deadpool = { workspace = true }
2525
enquote = { workspace = true }
26+
eyre = { workspace = true }
2627
fallible-iterator = { workspace = true }
2728
foca = { workspace = true }
2829
futures = { workspace = true }

crates/corro-types/src/agent.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use crate::{
4848
broadcast::{BroadcastInput, ChangeSource, ChangeV1, FocaInput, Timestamp},
4949
channel::{bounded, CorroSender},
5050
config::Config,
51+
metrics_tracker::MetricsTracker,
5152
pubsub::SubsManager,
5253
schema::Schema,
5354
sqlite::{
@@ -89,6 +90,8 @@ pub struct AgentConfig {
8990

9091
pub updates_manager: UpdatesManager,
9192

93+
pub metrics_tracker: MetricsTracker,
94+
9295
pub tripwire: Tripwire,
9396
}
9497

@@ -100,6 +103,7 @@ pub struct AgentInner {
100103
external_addr: Option<SocketAddr>,
101104
api_addr: SocketAddr,
102105
members: RwLock<Members>,
106+
metrics_tracker: MetricsTracker,
103107
clock: Arc<uhlc::HLC>,
104108
booked: Booked,
105109
tx_bcast: CorroSender<BroadcastInput>,
@@ -130,6 +134,7 @@ impl Agent {
130134
external_addr: config.external_addr,
131135
api_addr: config.api_addr,
132136
members: config.members,
137+
metrics_tracker: config.metrics_tracker,
133138
clock: config.clock,
134139
booked: config.booked,
135140
tx_bcast: config.tx_bcast,
@@ -231,6 +236,10 @@ impl Agent {
231236
&self.0.schema
232237
}
233238

239+
pub fn metrics_tracker(&self) -> &MetricsTracker {
240+
&self.0.metrics_tracker
241+
}
242+
234243
pub fn db_path(&self) -> Utf8PathBuf {
235244
self.0.config.load().db.path.clone()
236245
}

crates/corro-types/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub mod channel;
88
pub mod config;
99
pub mod gauge;
1010
pub mod members;
11+
pub mod metrics_tracker;
1112
pub mod pubsub;
1213
pub mod schema;
1314
pub mod sqlite;

0 commit comments

Comments
 (0)