diff --git a/Cargo.lock b/Cargo.lock index 8a4b36a1d6..b97980c1c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1517,6 +1517,7 @@ dependencies = [ "rivet-api-builder", "rivet-config", "rivet-error", + "rivet-metrics", "rivet-pools", "rivet-runtime", "rivet-test-deps", diff --git a/engine/packages/epoxy/Cargo.toml b/engine/packages/epoxy/Cargo.toml index 3ef3bea138..366fd7f63e 100644 --- a/engine/packages/epoxy/Cargo.toml +++ b/engine/packages/epoxy/Cargo.toml @@ -19,6 +19,7 @@ rivet-api-builder.workspace = true rivet-config.workspace = true rivet-error.workspace = true rivet-pools.workspace = true +rivet-metrics.workspace = true rivet-util.workspace = true serde.workspace = true serde_bare.workspace = true diff --git a/engine/packages/epoxy/src/http_client.rs b/engine/packages/epoxy/src/http_client.rs index 2b5a859783..1eee2eee76 100644 --- a/engine/packages/epoxy/src/http_client.rs +++ b/engine/packages/epoxy/src/http_client.rs @@ -10,7 +10,7 @@ use rivet_api_builder::ApiCtx; use std::future::Future; use vbare::OwnedVersionedData; -use crate::utils; +use crate::{metrics, utils}; /// Find the API replica URL for a given replica ID in the topology fn find_replica_address( @@ -92,6 +92,17 @@ where } } + metrics::QUORUM_ATTEMPTS_TOTAL + .with_label_values(&[ + quorum_type.to_string().as_str(), + if successful_responses.len() == target_responses { + "ok" + } else { + "insufficient_responses" + }, + ]) + .inc(); + Ok(successful_responses) } diff --git a/engine/packages/epoxy/src/lib.rs b/engine/packages/epoxy/src/lib.rs index 465e853812..3b3125906b 100644 --- a/engine/packages/epoxy/src/lib.rs +++ b/engine/packages/epoxy/src/lib.rs @@ -5,6 +5,7 @@ pub mod errors; pub mod http_client; pub mod http_routes; pub mod keys; +pub mod metrics; pub mod ops; pub mod replica; pub mod types; diff --git a/engine/packages/epoxy/src/metrics.rs b/engine/packages/epoxy/src/metrics.rs new file mode 100644 index 0000000000..089415026f --- /dev/null +++ b/engine/packages/epoxy/src/metrics.rs @@ -0,0 +1,110 @@ +use rivet_metrics::{BUCKETS, REGISTRY, prometheus::*}; + +lazy_static::lazy_static! { + // MARK: Consensus Operations + pub static ref PROPOSALS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "epoxy_proposals_total", + "Total number of proposals.", + &["status"], + *REGISTRY + ).unwrap(); + + pub static ref PRE_ACCEPT_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "epoxy_pre_accept_total", + "Total number of pre-accept operations.", + &["result"], + *REGISTRY + ).unwrap(); + + pub static ref ACCEPT_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "epoxy_accept_total", + "Total number of accept operations.", + &["result"], + *REGISTRY + ).unwrap(); + + pub static ref COMMIT_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "epoxy_commit_total", + "Total number of commit operations.", + &["result"], + *REGISTRY + ).unwrap(); + + pub static ref PROPOSAL_DURATION: Histogram = register_histogram_with_registry!( + "epoxy_proposal_duration", + "Duration from propose to commit in seconds.", + BUCKETS.to_vec(), + *REGISTRY + ).unwrap(); + + // MARK: Message Handling + pub static ref REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "epoxy_requests_total", + "Total number of requests.", + &["request_type", "result"], + *REGISTRY + ).unwrap(); + + pub static ref REQUEST_DURATION: HistogramVec = register_histogram_vec_with_registry!( + "epoxy_request_duration", + "Duration of request handling in seconds.", + &["request_type"], + BUCKETS.to_vec(), + *REGISTRY + ).unwrap(); + + // MARK: Quorum + pub static ref QUORUM_ATTEMPTS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "epoxy_quorum_attempts_total", + "Total number of quorum attempts.", + &["quorum_type", "result"], + *REGISTRY + ).unwrap(); + + pub static ref QUORUM_SIZE: IntGaugeVec = register_int_gauge_vec_with_registry!( + "epoxy_quorum_size", + "Current quorum size.", + &["quorum_type"], + *REGISTRY + ).unwrap(); + + // MARK: Replica State + pub static ref BALLOT_EPOCH: IntGauge = register_int_gauge_with_registry!( + "epoxy_ballot_epoch", + "Current ballot epoch.", + *REGISTRY + ).unwrap(); + + pub static ref BALLOT_NUMBER: IntGauge = register_int_gauge_with_registry!( + "epoxy_ballot_number", + "Current ballot number.", + *REGISTRY + ).unwrap(); + + pub static ref INSTANCE_NUMBER: IntGauge = register_int_gauge_with_registry!( + "epoxy_instance_number", + "Current instance/slot number.", + *REGISTRY + ).unwrap(); + + // MARK: Cluster Health + pub static ref REPLICAS_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!( + "epoxy_replicas_total", + "Total number of replicas.", + &["status"], + *REGISTRY + ).unwrap(); + + // MARK: Errors + pub static ref BALLOT_REJECTIONS_TOTAL: IntCounter = register_int_counter_with_registry!( + "epoxy_ballot_rejections_total", + "Total number of ballot rejections due to stale ballot.", + *REGISTRY + ).unwrap(); + + pub static ref INTERFERENCE_DETECTED_TOTAL: IntCounter = register_int_counter_with_registry!( + "epoxy_interference_detected_total", + "Total number of key conflict interferences detected.", + *REGISTRY + ).unwrap(); +} diff --git a/engine/packages/epoxy/src/ops/propose.rs b/engine/packages/epoxy/src/ops/propose.rs index 62c443a518..87e05f278f 100644 --- a/engine/packages/epoxy/src/ops/propose.rs +++ b/engine/packages/epoxy/src/ops/propose.rs @@ -4,8 +4,9 @@ use base64::engine::general_purpose::STANDARD as BASE64; use epoxy_protocol::protocol::{self, Path, Payload, ReplicaId}; use gas::prelude::*; use rivet_api_builder::prelude::*; +use std::time::Instant; -use crate::{http_client, replica, utils}; +use crate::{http_client, metrics, replica, utils}; #[derive(Debug, Serialize, Deserialize)] pub enum ProposalResult { @@ -31,6 +32,7 @@ pub struct Input { #[operation] pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result { + let start = Instant::now(); let replica_id = ctx.config().epoxy_replica_id(); // Read config @@ -71,9 +73,9 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result { - commit(ctx, &config, replica_id, payload, input.purge_cache).await + commit(ctx, &config, replica_id, payload, input.purge_cache).await? } Path::PathSlow(protocol::PathSlow { payload }) => { run_paxos_accept( @@ -84,9 +86,20 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result Result { + let start = Instant::now(); + let request_type = match &request.kind { + protocol::RequestKind::UpdateConfigRequest(_) => "update_config", + protocol::RequestKind::PrepareRequest(_) => "prepare", + protocol::RequestKind::PreAcceptRequest(_) => "pre_accept", + protocol::RequestKind::AcceptRequest(_) => "accept", + protocol::RequestKind::CommitRequest(_) => "commit", + protocol::RequestKind::DownloadInstancesRequest(_) => "download_instances", + protocol::RequestKind::HealthCheckRequest => "health_check", + protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(_) => { + "coordinator_update_replica_status" + } + protocol::RequestKind::BeginLearningRequest(_) => "begin_learning", + protocol::RequestKind::KvGetRequest(_) => "kv_get", + protocol::RequestKind::KvPurgeRequest(_) => "kv_purge", + }; + let res = message_request_inner(ctx, request).await; + + metrics::REQUEST_DURATION + .with_label_values(&[request_type]) + .observe(start.elapsed().as_secs_f64()); + metrics::REQUESTS_TOTAL + .with_label_values(&[request_type, if res.is_ok() { "ok" } else { "err" }]) + .inc(); + + res +} + +#[tracing::instrument(skip_all)] +async fn message_request_inner( + ctx: &ApiCtx, + request: protocol::Request, ) -> Result { let current_replica_id = ctx.config().epoxy_replica_id(); diff --git a/engine/packages/epoxy/src/replica/messages/accept.rs b/engine/packages/epoxy/src/replica/messages/accept.rs index 90c15d48f9..e6af7b664a 100644 --- a/engine/packages/epoxy/src/replica/messages/accept.rs +++ b/engine/packages/epoxy/src/replica/messages/accept.rs @@ -2,7 +2,7 @@ use anyhow::{Result, ensure}; use epoxy_protocol::protocol; use universaldb::Transaction; -use crate::replica::ballot; +use crate::{metrics, replica::ballot}; #[tracing::instrument(skip_all)] pub async fn accept( @@ -24,6 +24,13 @@ pub async fn accept( let validation = ballot::validate_and_update_ballot_for_instance(tx, replica_id, ¤t_ballot, &instance) .await?; + + if !validation.is_valid { + metrics::ACCEPT_TOTAL + .with_label_values(&["invalid_ballot"]) + .inc(); + } + ensure!( validation.is_valid, "ballot validation failed for accept: incoming ballot {:?} is not greater than stored ballot {:?} for instance {:?} (comparison: {:?})", @@ -43,6 +50,8 @@ pub async fn accept( }; crate::replica::update_log(tx, replica_id, log_entry, &instance).await?; + metrics::ACCEPT_TOTAL.with_label_values(&["ok"]).inc(); + // EPaxos Step 19 Ok(protocol::AcceptResponse { payload: protocol::AcceptOKPayload { proposal, instance }, diff --git a/engine/packages/epoxy/src/replica/messages/commit.rs b/engine/packages/epoxy/src/replica/messages/commit.rs index 01ca1408e7..f0af1f7650 100644 --- a/engine/packages/epoxy/src/replica/messages/commit.rs +++ b/engine/packages/epoxy/src/replica/messages/commit.rs @@ -2,7 +2,7 @@ use anyhow::Result; use epoxy_protocol::protocol; use universaldb::Transaction; -use crate::replica::ballot; +use crate::{metrics, replica::ballot}; // EPaxos Step 24 #[tracing::instrument(skip_all)] @@ -40,6 +40,10 @@ pub async fn commit( None }; + metrics::COMMIT_TOTAL + .with_label_values(&[if cmd_err.is_none() { "ok" } else { "cmd_err" }]) + .inc(); + tracing::debug!(?replica_id, ?instance, ?cmd_err, "committed"); Ok(()) diff --git a/engine/packages/epoxy/src/replica/messages/pre_accept.rs b/engine/packages/epoxy/src/replica/messages/pre_accept.rs index 69091386b1..8b2faeb169 100644 --- a/engine/packages/epoxy/src/replica/messages/pre_accept.rs +++ b/engine/packages/epoxy/src/replica/messages/pre_accept.rs @@ -3,7 +3,10 @@ use epoxy_protocol::protocol; use std::cmp; use universaldb::Transaction; -use crate::replica::{ballot, utils}; +use crate::{ + metrics, + replica::{ballot, utils}, +}; #[tracing::instrument(skip_all)] pub async fn pre_accept( @@ -27,6 +30,13 @@ pub async fn pre_accept( let validation = ballot::validate_and_update_ballot_for_instance(tx, replica_id, ¤t_ballot, &instance) .await?; + + if !validation.is_valid { + metrics::PRE_ACCEPT_TOTAL + .with_label_values(&["invalid_ballot"]) + .inc(); + } + ensure!( validation.is_valid, "ballot validation failed for pre_accept: incoming ballot {:?} is not greater than stored ballot {:?} for instance {:?} (comparison: {:?})", @@ -57,6 +67,8 @@ pub async fn pre_accept( }; crate::replica::update_log(tx, replica_id, log_entry, &instance).await?; + metrics::PRE_ACCEPT_TOTAL.with_label_values(&["ok"]).inc(); + // EPaxos Step 9 Ok(protocol::PreAcceptResponse { payload: protocol::Payload { diff --git a/engine/packages/epoxy/src/replica/update_config.rs b/engine/packages/epoxy/src/replica/update_config.rs index c54a26ee2e..380022f6cd 100644 --- a/engine/packages/epoxy/src/replica/update_config.rs +++ b/engine/packages/epoxy/src/replica/update_config.rs @@ -3,7 +3,7 @@ use epoxy_protocol::protocol::{self, ReplicaId}; use universaldb::Transaction; use universaldb::utils::FormalKey; -use crate::keys; +use crate::{keys, metrics, utils}; pub fn update_config( tx: &Transaction, @@ -12,6 +12,33 @@ pub fn update_config( ) -> Result<()> { tracing::debug!("updating config"); + metrics::REPLICAS_TOTAL.reset(); + for replica in &update_config_req.config.replicas { + metrics::REPLICAS_TOTAL + .with_label_values(&[match replica.status { + protocol::ReplicaStatus::Active => "active", + protocol::ReplicaStatus::Learning => "learning", + protocol::ReplicaStatus::Joining => "joining", + }]) + .inc(); + } + + let quorum_members = utils::get_quorum_members(&update_config_req.config); + let quorum_member_count = quorum_members.len(); + + metrics::QUORUM_SIZE + .with_label_values(&["fast"]) + .set(utils::calculate_quorum(quorum_member_count, utils::QuorumType::Fast) as i64); + metrics::QUORUM_SIZE + .with_label_values(&["slow"]) + .set(utils::calculate_quorum(quorum_member_count, utils::QuorumType::Slow) as i64); + metrics::QUORUM_SIZE + .with_label_values(&["all"]) + .set(utils::calculate_quorum(quorum_member_count, utils::QuorumType::All) as i64); + metrics::QUORUM_SIZE + .with_label_values(&["any"]) + .set(utils::calculate_quorum(quorum_member_count, utils::QuorumType::Any) as i64); + // Store config in UDB let config_key = keys::replica::ConfigKey; let subspace = keys::subspace(replica_id); diff --git a/engine/packages/epoxy/src/replica/utils.rs b/engine/packages/epoxy/src/replica/utils.rs index f867c78f0a..30cac9a31b 100644 --- a/engine/packages/epoxy/src/replica/utils.rs +++ b/engine/packages/epoxy/src/replica/utils.rs @@ -5,7 +5,7 @@ use std::{cmp::Ordering, collections::HashSet}; use universaldb::prelude::*; use universaldb::{KeySelector, RangeOption, Transaction, options::StreamingMode}; -use crate::keys; +use crate::{keys, metrics}; // Helper function to find interference for a key #[tracing::instrument(skip_all)] @@ -50,6 +50,10 @@ pub async fn find_interference( } } + if !interf.is_empty() { + metrics::INTERFERENCE_DETECTED_TOTAL.inc(); + } + interf.sort_by(sort_instances); Ok(interf) } diff --git a/engine/packages/epoxy/src/utils.rs b/engine/packages/epoxy/src/utils.rs index 5e7c6ec571..cd7f51953a 100644 --- a/engine/packages/epoxy/src/utils.rs +++ b/engine/packages/epoxy/src/utils.rs @@ -1,5 +1,6 @@ use anyhow::*; use epoxy_protocol::protocol::{self, ReplicaId}; +use std::fmt; use universaldb::{Transaction, utils::IsolationLevel::*}; #[derive(Clone, Copy, Debug)] @@ -10,6 +11,17 @@ pub enum QuorumType { Any, } +impl fmt::Display for QuorumType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + QuorumType::Fast => write!(f, "fast"), + QuorumType::Slow => write!(f, "slow"), + QuorumType::All => write!(f, "all"), + QuorumType::Any => write!(f, "any"), + } + } +} + pub enum ReplicaFilter { All, Active,