Skip to content

Commit f0a6e83

Browse files
saibatizokuMr-Leshiystevenj
authored
fix(cat-gateway): Correct Service Health logic (#1974)
* wip(rust/signed_doc): add atomic variables for live check, update endpoint and middleware * wip(rust/signed_doc): reset live counter * fix(cat-gateway): typo * fix(cat-gateway): use timestamp for LIVE_COUNTER, update logic * fix(cat-gateway): refactor code into health utilities * fix(cat-gateway): fix panic catcher to disable live service flag * fix(cat-gateway): remove unused atomic-counter crate * fix(cat-gateway): refactor health::live utilities into proper module * fix(cat-gateway): restore live counter logic, set env var for threshold * fix(cat-gateway): code format * fix(cat-gateway): return service unavailable with proper error * fix(cat-gateway): refactor health::start utilities into proper module * feat(cat-gateway): add service::utilities::health::ready module * wip(rust/signed_doc): add atomic variables for live check, update endpoint and middleware * wip(rust/signed_doc): reset live counter * fix(cat-gateway): typo * fix(cat-gateway): use timestamp for LIVE_COUNTER, update logic * fix(cat-gateway): refactor code into health utilities * fix(cat-gateway): fix panic catcher to disable live service flag * fix(cat-gateway): remove unused atomic-counter crate * fix(cat-gateway): refactor health::live utilities into proper module * fix(cat-gateway): restore live counter logic, set env var for threshold * fix(cat-gateway): code format * fix(cat-gateway): return service unavailable with proper error * fix(cat-gateway): add atomic vars to health::started to keep track and set state * fix(cat-gateway): add atomic var to health::started to keep track of chain follower * fix(cat-gateway): implement logic for health::started flags * fix(cat-gateway): add middleware to check DB connections * fix(cat-gateway): attempt DB reconnect if health/ready check fails * fix(cat-gateway): implement logic for health::ready endpoint * fix(cat-gateway): handle DB errors at endpoints * chore(cat-gateway): fix doc comments * fix(cat-gateway): add suggested fixes * fix(cat-gateway): simplify boolean logic, correctly set flag when follower first syncs * fix(cat-gateway): add suggested fix * chore(docs): fix doc comment * fix(cat-gateway): update doc comments for health endpoints * fix(cat-gateway): add suggested fix to doc comments * wip * fix(cat-gateway): set index db liveness after waiting for it to be ready --------- Co-authored-by: Alex Pozhylenkov <[email protected]> Co-authored-by: Steven Johnson <[email protected]>
1 parent 391f74b commit f0a6e83

File tree

17 files changed

+471
-35
lines changed

17 files changed

+471
-35
lines changed

catalyst-gateway/bin/src/cardano/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::{
2323
},
2424
session::CassandraSession,
2525
},
26+
service::utilities::health::{follower_has_first_reached_tip, set_follower_first_reached_tip},
2627
settings::{chain_follower, Settings},
2728
};
2829

@@ -31,7 +32,7 @@ pub(crate) mod event;
3132
pub(crate) mod util;
3233

3334
/// How long we wait between checks for connection to the indexing DB to be ready.
34-
const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
35+
pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
3536

3637
/// Start syncing a particular network
3738
async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
@@ -294,6 +295,11 @@ fn sync_subchain(
294295
);
295296
}
296297

298+
// Update flag if this is the first time reaching TIP.
299+
if chain_update.tip && !follower_has_first_reached_tip() {
300+
set_follower_first_reached_tip();
301+
}
302+
297303
update_block_state(
298304
block,
299305
&mut first_indexed_block,
@@ -435,6 +441,12 @@ impl SyncTask {
435441
/// Primary Chain Follower task.
436442
///
437443
/// This continuously runs in the background, and never terminates.
444+
///
445+
/// Sets the Index DB liveness flag to true if it is not already set.
446+
///
447+
/// Sets the Chain Follower Has First Reached Tip flag to true if it is not already
448+
/// set.
449+
#[allow(clippy::too_many_lines)]
438450
async fn run(&mut self) {
439451
// We can't sync until the local chain data is synced.
440452
// This call will wait until we sync.
@@ -453,7 +465,10 @@ impl SyncTask {
453465
// Wait for indexing DB to be ready before continuing.
454466
// We do this after the above, because other nodes may have finished already, and we don't
455467
// want to wait do any work they already completed while we were fetching the blockchain.
468+
//
469+
// After waiting, we set the liveness flag to true if it is not already set.
456470
drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
471+
457472
info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state");
458473
self.sync_status = get_sync_status().await;
459474
debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);

catalyst-gateway/bin/src/cli.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//! CLI interpreter for the service
2-
use std::{io::Write, path::PathBuf};
2+
use std::{io::Write, path::PathBuf, time::Duration};
33

44
use clap::Parser;
55
use tracing::{error, info};
@@ -8,8 +8,10 @@ use crate::{
88
cardano::start_followers,
99
db::{self, index::session::CassandraSession},
1010
service::{
11-
self, started,
12-
utilities::health::{is_live, live_counter_reset},
11+
self,
12+
utilities::health::{
13+
condition_for_started, is_live, live_counter_reset, service_has_started, set_to_started,
14+
},
1315
},
1416
settings::{ServiceSettings, Settings},
1517
};
@@ -49,13 +51,15 @@ impl Cli {
4951

5052
info!("Catalyst Gateway - Starting");
5153

52-
// Start the DB's
54+
// Start the DB's.
5355
CassandraSession::init();
56+
5457
db::event::establish_connection();
5558

5659
// Start the chain indexing follower.
5760
start_followers().await?;
5861

62+
// Start the API service.
5963
let handle = tokio::spawn(async move {
6064
match service::run().await {
6165
Ok(()) => info!("Endpoints started ok"),
@@ -66,6 +70,7 @@ impl Cli {
6670
});
6771
tasks.push(handle);
6872

73+
// Start task to reset the service 'live counter' at a regular interval.
6974
let handle = tokio::spawn(async move {
7075
while is_live() {
7176
tokio::time::sleep(Settings::service_live_timeout_interval()).await;
@@ -74,8 +79,18 @@ impl Cli {
7479
});
7580
tasks.push(handle);
7681

77-
started();
82+
// Start task to wait for the service 'started' flag to be `true`.
83+
let handle = tokio::spawn(async move {
84+
while !service_has_started() {
85+
tokio::time::sleep(Duration::from_secs(1)).await;
86+
if condition_for_started() {
87+
set_to_started();
88+
}
89+
}
90+
});
91+
tasks.push(handle);
7892

93+
// Run all asynchronous tasks to completion.
7994
for task in tasks {
8095
task.await?;
8196
}

catalyst-gateway/bin/src/db/event/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use futures::{Stream, StreamExt, TryStreamExt};
1414
use tokio_postgres::{types::ToSql, NoTls, Row};
1515
use tracing::{debug, debug_span, error, Instrument};
1616

17-
use crate::settings::Settings;
17+
use crate::{service::utilities::health::set_event_db_liveness, settings::Settings};
1818

1919
pub(crate) mod common;
2020
pub(crate) mod config;
@@ -160,6 +160,11 @@ impl EventDB {
160160
Ok(())
161161
}
162162

163+
/// Checks that connection to `EventDB` is available.
164+
pub(crate) fn connection_is_ok() -> bool {
165+
EVENT_DB_POOL.get().is_some()
166+
}
167+
163168
/// Prepend `EXPLAIN ANALYZE` to the query, and rollback the transaction.
164169
async fn explain_analyze_rollback(
165170
stmt: &str, params: &[&(dyn ToSql + Sync)],
@@ -239,6 +244,8 @@ impl EventDB {
239244
///
240245
/// The env var "`DATABASE_URL`" can be set directly as an anv var, or in a
241246
/// `.env` file.
247+
///
248+
/// If connection to the pool is `OK`, the `LIVE_EVENT_DB` atomic flag is set to `true`.
242249
pub fn establish_connection() {
243250
let (url, user, pass) = Settings::event_db_settings();
244251

@@ -261,5 +268,7 @@ pub fn establish_connection() {
261268

262269
if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
263270
error!("Failed to set event db pool. Called Twice?");
271+
} else {
272+
set_event_db_liveness(true);
264273
}
265274
}

catalyst-gateway/bin/src/db/index/session.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ use super::{
2424
},
2525
schema::create_schema,
2626
};
27-
use crate::settings::{cassandra_db, Settings};
27+
use crate::{
28+
service::utilities::health::set_index_db_liveness,
29+
settings::{cassandra_db, Settings},
30+
};
2831

2932
/// Configuration Choices for compression
3033
#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames)]
@@ -130,11 +133,13 @@ impl CassandraSession {
130133
loop {
131134
if !ignore_err {
132135
if let Some(err) = INIT_SESSION_ERROR.get() {
136+
set_index_db_liveness(false);
133137
return Err(err.clone());
134138
}
135139
}
136140

137141
if Self::is_ready() {
142+
set_index_db_liveness(true);
138143
return Ok(());
139144
}
140145

catalyst-gateway/bin/src/service/api/health/live_get.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,64 @@
1-
//! Implementation of the `GET /health/live` endpoint.
1+
//! # Implementation of the `GET /health/live` endpoint.
2+
//!
3+
//! This module provides an HTTP endpoint to monitor the liveness of the API service using
4+
//! a simple counter mechanism. It uses an atomic boolean named `IS_LIVE` to track whether
5+
//! the service is operational. The `IS_LIVE` boolean is initially set to `true`.
6+
//!
7+
//! ## Key Features
8+
//!
9+
//! 1. **Atomic Counter**: The endpoint maintains an atomic counter that increments
10+
//! every time the endpoint is accessed. This counter helps track the number of
11+
//! requests made to the endpoint.
12+
//!
13+
//! 2. **Counter Reset**: Every 30 seconds, the counter is automatically reset to zero.
14+
//! This ensures that the count reflects recent activity rather than cumulative usage
15+
//! over a long period.
16+
//!
17+
//! 3. **Threshold Check**: If the counter reaches a predefined threshold (e.g., 100),
18+
//! the `IS_LIVE` boolean is set to `false`. This indicates that the service is no
19+
//! longer operational. Once `IS_LIVE` is set to `false`, it cannot be changed back to
20+
//! `true`.
21+
//!
22+
//! 4. **Response Logic**:
23+
//! - If `IS_LIVE` is `true`, the endpoint returns a `204 No Content` response,
24+
//! indicating that the service is healthy and operational.
25+
//! - If `IS_LIVE` is `false`, the endpoint returns a `503 Service Unavailable`
26+
//! response, indicating that the service is no longer operational.
27+
//!
28+
//! ## How It Works
29+
//!
30+
//! - When the endpoint is called, the atomic counter increments by 1.
31+
//! - Every 30 seconds, the counter is reset to 0 to ensure it only reflects recent
32+
//! activity.
33+
//! - If the counter reaches the threshold (e.g., 100), the `IS_LIVE` boolean is set to
34+
//! `false`.
35+
//! - Once `IS_LIVE` is `false`, the endpoint will always respond with `503 Service
36+
//! Unavailable`.
37+
//! - If `IS_LIVE` is `true`, the endpoint responds with `204 No Content`.
38+
//!
39+
//! ## Example Scenarios
40+
//!
41+
//! 1. **Normal Operation**:
42+
//! - The counter is below the threshold.
43+
//! - `IS_LIVE` remains `true`.
44+
//! - The endpoint returns `204 No Content`.
45+
//!
46+
//! 2. **Threshold Exceeded**:
47+
//! - The counter reaches 100.
48+
//! - `IS_LIVE` is set to `false`.
49+
//! - The endpoint returns `503 Service Unavailable`.
50+
//!
51+
//! 3. **After Threshold Exceeded**:
52+
//! - The counter is reset to 0, but `IS_LIVE` remains `false`.
53+
//! - The endpoint continues to return `503 Service Unavailable`.
54+
//!
55+
//! ## Notes
56+
//!
57+
//! - The `IS_LIVE` boolean is atomic, meaning it is thread-safe and can be accessed
58+
//! concurrently without issues.
59+
//!
60+
//! This endpoint is useful for monitoring service liveness and automatically marking it
61+
//! as unavailable if it becomes overloaded or encounters issues.
262
363
use poem_openapi::ApiResponse;
464

catalyst-gateway/bin/src/service/api/health/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ mod inspection_get;
77
mod live_get;
88
mod ready_get;
99
mod started_get;
10-
pub(crate) use started_get::started;
1110

1211
/// Health API Endpoints
1312
pub(crate) struct HealthApi;

catalyst-gateway/bin/src/service/api/health/ready_get.rs

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,92 @@
1-
//! Implementation of the GET /health/ready endpoint
1+
//! # Implementation of the GET /health/ready endpoint
2+
//!
3+
//! This module provides an HTTP endpoint to monitor the readiness of the service's
4+
//! database connections and attempt to reconnect to any databases that are not currently
5+
//! live. It uses the `LIVE_INDEX_DB` and `LIVE_EVENT_DB` atomic booleans defined in the
6+
//! parent module to track the status of the Index DB and Event DB, respectively.
7+
//!
8+
//! ## Key Features
9+
//!
10+
//! 1. **Reconnection Logic**:
11+
//! - If either `LIVE_INDEX_DB` or `LIVE_EVENT_DB` is `false`, the service will attempt
12+
//! to reconnect to the corresponding database.
13+
//! - If the reconnection attempt is successful, the respective flag (`LIVE_INDEX_DB`
14+
//! or `LIVE_EVENT_DB`) is set to `true`.
15+
//! - If the reconnection attempt fails, the flag remains `false`.
16+
//!
17+
//! 2. **Readiness Check Logic**:
18+
//! - After attempting to reconnect to any non-live databases, the endpoint checks the
19+
//! status of both `LIVE_INDEX_DB` and `LIVE_EVENT_DB`.
20+
//! - If both flags are `true`, the endpoint returns a `204 No Content` response,
21+
//! indicating that all databases are live and the service is healthy.
22+
//! - If either flag is `false`, the endpoint returns a `503 Service Unavailable`
23+
//! response, indicating that at least one database is not live and the service is
24+
//! unhealthy.
25+
//!
26+
//! ## How It Works
27+
//!
28+
//! - When the endpoint is called, it first checks the status of `LIVE_INDEX_DB` and
29+
//! `LIVE_EVENT_DB`.
30+
//! - For any database that is not live (i.e., its flag is `false`), the service attempts
31+
//! to reconnect to that database.
32+
//! - If the reconnection attempt is successful, the corresponding flag is set to `true`.
33+
//! - After attempting to reconnect, the endpoint checks the status of both flags:
34+
//! - If both `LIVE_INDEX_DB` and `LIVE_EVENT_DB` are `true`, the endpoint returns `204
35+
//! No Content`.
36+
//! - If either flag is `false`, the endpoint returns `503 Service Unavailable`.
37+
//!
38+
//! ## Example Scenarios
39+
//!
40+
//! 1. **Both Databases Live**:
41+
//! - `LIVE_INDEX_DB` and `LIVE_EVENT_DB` are both `true`.
42+
//! - No reconnection attempts are made.
43+
//! - The endpoint returns `204 No Content`.
44+
//!
45+
//! 2. **Index DB Not Live, Reconnection Successful**:
46+
//! - `LIVE_INDEX_DB` is `false`, and `LIVE_EVENT_DB` is `true`.
47+
//! - The service attempts to reconnect to the Index DB and succeeds.
48+
//! - `LIVE_INDEX_DB` is set to `true`.
49+
//! - The endpoint returns `204 No Content`.
50+
//!
51+
//! 3. **Event DB Not Live, Reconnection Fails**:
52+
//! - `LIVE_INDEX_DB` is `true`, and `LIVE_EVENT_DB` is `false`.
53+
//! - The service attempts to reconnect to the Event DB but fails.
54+
//! - `LIVE_EVENT_DB` remains `false`.
55+
//! - The endpoint returns `503 Service Unavailable`.
56+
//!
57+
//! 4. **Both Databases Not Live, Reconnection Partially Successful**:
58+
//! - `LIVE_INDEX_DB` and `LIVE_EVENT_DB` are both `false`.
59+
//! - The service attempts to reconnect to both databases.
60+
//! - Reconnection to the Index DB succeeds (`LIVE_INDEX_DB` is set to `true`), but
61+
//! reconnection to the Event DB fails (`LIVE_EVENT_DB` remains `false`).
62+
//! - The endpoint returns `503 Service Unavailable`.
63+
//!
64+
//! ## Notes
65+
//!
66+
//! - The reconnection logic ensures that the service actively attempts to restore
67+
//! connectivity to any non-live databases, improving robustness and reliability.
68+
//! - The atomic booleans (`LIVE_INDEX_DB` and `LIVE_EVENT_DB`) are thread-safe, allowing
69+
//! concurrent access without issues.
70+
//! - This endpoint is useful for monitoring and automatically recovering from transient
71+
//! database connectivity issues, ensuring that the service remains operational whenever
72+
//! possible.
73+
//!
74+
//! This endpoint complements the initialization and readiness monitoring endpoints by
75+
//! providing ongoing connectivity checks and recovery attempts for the service's
76+
//! databases.
277
use poem_openapi::ApiResponse;
378

4-
use crate::service::common::responses::WithErrorResponses;
79+
use crate::{
80+
cardano::INDEXING_DB_READY_WAIT_INTERVAL,
81+
db::{
82+
event::{establish_connection, EventDB},
83+
index::session::CassandraSession,
84+
},
85+
service::{
86+
common::{responses::WithErrorResponses, types::headers::retry_after::RetryAfterOption},
87+
utilities::health::{event_db_is_live, index_db_is_live, set_event_db_liveness},
88+
},
89+
};
590

691
/// Endpoint responses.
792
#[derive(ApiResponse)]
@@ -35,5 +120,41 @@ pub(crate) type AllResponses = WithErrorResponses<Responses>;
35120
/// service that are ready.
36121
#[allow(clippy::unused_async)]
37122
pub(crate) async fn endpoint() -> AllResponses {
38-
Responses::NoContent.into()
123+
// Check Event DB connection
124+
let event_db_live = event_db_is_live();
125+
126+
// When check fails, attempt to re-connect
127+
if !event_db_live {
128+
establish_connection();
129+
// Re-check, and update Event DB service liveness flag
130+
set_event_db_liveness(EventDB::connection_is_ok());
131+
};
132+
133+
// Check Index DB connection
134+
let index_db_live = index_db_is_live();
135+
136+
// When check fails, attempt to re-connect
137+
if !index_db_live {
138+
CassandraSession::init();
139+
// Re-check connection to Indexing DB (internally updates the liveness flag)
140+
drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
141+
}
142+
143+
let success_response = Responses::NoContent.into();
144+
145+
// Return 204 response if check passed initially.
146+
if index_db_live && event_db_live {
147+
return success_response;
148+
}
149+
150+
// Otherwise, re-check, and return 204 response if all is good.
151+
if index_db_is_live() && event_db_is_live() {
152+
return success_response;
153+
}
154+
155+
// Otherwise, return 503 response.
156+
AllResponses::service_unavailable(
157+
&anyhow::anyhow!("Service is not ready, do not send other requests."),
158+
RetryAfterOption::Default,
159+
)
39160
}

0 commit comments

Comments
 (0)