Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions engine/packages/epoxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ rivet-api-builder.workspace = true
rivet-config.workspace = true
rivet-error.workspace = true
rivet-pools.workspace = true
rivet-metrics.workspace = true
rivet-util.workspace = true
serde.workspace = true
serde_bare.workspace = true
Expand Down
13 changes: 12 additions & 1 deletion engine/packages/epoxy/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use rivet_api_builder::ApiCtx;
use std::future::Future;
use vbare::OwnedVersionedData;

use crate::utils;
use crate::{metrics, utils};

/// Find the API replica URL for a given replica ID in the topology
fn find_replica_address(
Expand Down Expand Up @@ -92,6 +92,17 @@ where
}
}

metrics::QUORUM_ATTEMPTS_TOTAL
.with_label_values(&[
quorum_type.to_string().as_str(),
if successful_responses.len() == target_responses {
"ok"
} else {
"insufficient_responses"
},
])
.inc();

Ok(successful_responses)
}

Expand Down
1 change: 1 addition & 0 deletions engine/packages/epoxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod errors;
pub mod http_client;
pub mod http_routes;
pub mod keys;
pub mod metrics;
pub mod ops;
pub mod replica;
pub mod types;
Expand Down
110 changes: 110 additions & 0 deletions engine/packages/epoxy/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use rivet_metrics::{BUCKETS, REGISTRY, prometheus::*};

lazy_static::lazy_static! {
// MARK: Consensus Operations
pub static ref PROPOSALS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"epoxy_proposals_total",
"Total number of proposals.",
&["status"],
*REGISTRY
).unwrap();

pub static ref PRE_ACCEPT_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"epoxy_pre_accept_total",
"Total number of pre-accept operations.",
&["result"],
*REGISTRY
).unwrap();

pub static ref ACCEPT_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"epoxy_accept_total",
"Total number of accept operations.",
&["result"],
*REGISTRY
).unwrap();

pub static ref COMMIT_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"epoxy_commit_total",
"Total number of commit operations.",
&["result"],
*REGISTRY
).unwrap();

pub static ref PROPOSAL_DURATION: Histogram = register_histogram_with_registry!(
"epoxy_proposal_duration",
"Duration from propose to commit in seconds.",
BUCKETS.to_vec(),
*REGISTRY
).unwrap();

// MARK: Message Handling
pub static ref REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"epoxy_requests_total",
"Total number of requests.",
&["request_type", "result"],
*REGISTRY
).unwrap();

pub static ref REQUEST_DURATION: HistogramVec = register_histogram_vec_with_registry!(
"epoxy_request_duration",
"Duration of request handling in seconds.",
&["request_type"],
BUCKETS.to_vec(),
*REGISTRY
).unwrap();

// MARK: Quorum
pub static ref QUORUM_ATTEMPTS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"epoxy_quorum_attempts_total",
"Total number of quorum attempts.",
&["quorum_type", "result"],
*REGISTRY
).unwrap();

pub static ref QUORUM_SIZE: IntGaugeVec = register_int_gauge_vec_with_registry!(
"epoxy_quorum_size",
"Current quorum size.",
&["quorum_type"],
*REGISTRY
).unwrap();

// MARK: Replica State
pub static ref BALLOT_EPOCH: IntGauge = register_int_gauge_with_registry!(
"epoxy_ballot_epoch",
"Current ballot epoch.",
*REGISTRY
).unwrap();

pub static ref BALLOT_NUMBER: IntGauge = register_int_gauge_with_registry!(
"epoxy_ballot_number",
"Current ballot number.",
*REGISTRY
).unwrap();

pub static ref INSTANCE_NUMBER: IntGauge = register_int_gauge_with_registry!(
"epoxy_instance_number",
"Current instance/slot number.",
*REGISTRY
).unwrap();

// MARK: Cluster Health
pub static ref REPLICAS_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!(
"epoxy_replicas_total",
"Total number of replicas.",
&["status"],
*REGISTRY
).unwrap();

// MARK: Errors
pub static ref BALLOT_REJECTIONS_TOTAL: IntCounter = register_int_counter_with_registry!(
"epoxy_ballot_rejections_total",
"Total number of ballot rejections due to stale ballot.",
*REGISTRY
).unwrap();

pub static ref INTERFERENCE_DETECTED_TOTAL: IntCounter = register_int_counter_with_registry!(
"epoxy_interference_detected_total",
"Total number of key conflict interferences detected.",
*REGISTRY
).unwrap();
}
23 changes: 18 additions & 5 deletions engine/packages/epoxy/src/ops/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use base64::engine::general_purpose::STANDARD as BASE64;
use epoxy_protocol::protocol::{self, Path, Payload, ReplicaId};
use gas::prelude::*;
use rivet_api_builder::prelude::*;
use std::time::Instant;

use crate::{http_client, replica, utils};
use crate::{http_client, metrics, replica, utils};

#[derive(Debug, Serialize, Deserialize)]
pub enum ProposalResult {
Expand All @@ -31,6 +32,7 @@ pub struct Input {

#[operation]
pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<ProposalResult> {
let start = Instant::now();
let replica_id = ctx.config().epoxy_replica_id();

// Read config
Expand Down Expand Up @@ -71,9 +73,9 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
.await
.context("failed deciding path")?;

match path {
let res = match path {
Path::PathFast(protocol::PathFast { payload }) => {
commit(ctx, &config, replica_id, payload, input.purge_cache).await
commit(ctx, &config, replica_id, payload, input.purge_cache).await?
}
Path::PathSlow(protocol::PathSlow { payload }) => {
run_paxos_accept(
Expand All @@ -84,9 +86,20 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
payload,
input.purge_cache,
)
.await
.await?
}
}
};

metrics::PROPOSAL_DURATION.observe(start.elapsed().as_secs_f64());
metrics::PROPOSALS_TOTAL
.with_label_values(&[if let ProposalResult::Committed = res {
"ok"
} else {
"err"
}])
.inc();

Ok(res)
}

#[tracing::instrument(skip_all)]
Expand Down
9 changes: 8 additions & 1 deletion engine/packages/epoxy/src/replica/ballot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use epoxy_protocol::protocol;
use universaldb::Transaction;
use universaldb::utils::{FormalKey, IsolationLevel::*};

use crate::keys;
use crate::{keys, metrics};

/// Get the current ballot for this replica
#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -50,6 +50,9 @@ pub async fn increment_ballot(

tx.set(&packed_key, &serialized);

metrics::BALLOT_EPOCH.set(current_ballot.epoch as i64);
metrics::BALLOT_NUMBER.set(current_ballot.ballot as i64);

Ok(current_ballot)
}

Expand Down Expand Up @@ -121,6 +124,10 @@ pub async fn validate_and_update_ballot_for_instance(
tracing::debug!(?ballot, ?instance, "updated highest ballot for instance");
}

if !is_valid {
metrics::BALLOT_REJECTIONS_TOTAL.inc();
}

Ok(BallotValidationResult {
is_valid,
incoming_ballot: ballot.clone(),
Expand Down
4 changes: 3 additions & 1 deletion engine/packages/epoxy/src/replica/lead_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use epoxy_protocol::protocol;
use universaldb::Transaction;
use universaldb::utils::{FormalKey, IsolationLevel::*};

use crate::keys;
use crate::replica::{ballot, utils};
use crate::{keys, metrics};

#[tracing::instrument(skip_all)]
pub async fn lead_consensus(
Expand Down Expand Up @@ -55,6 +55,8 @@ pub async fn lead_consensus(
};
crate::replica::update_log(tx, replica_id, log_entry, &instance).await?;

metrics::INSTANCE_NUMBER.set(instance.slot_id as i64);

// Return payload
Ok(protocol::Payload {
proposal,
Expand Down
36 changes: 35 additions & 1 deletion engine/packages/epoxy/src/replica/message_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,47 @@ use anyhow::*;
use epoxy_protocol::protocol::{self};
use gas::prelude::*;
use rivet_api_builder::prelude::*;
use std::time::Instant;

use crate::{ops, replica};
use crate::{metrics, ops, replica};

#[tracing::instrument(skip_all)]
pub async fn message_request(
ctx: &ApiCtx,
request: protocol::Request,
) -> Result<protocol::Response> {
let start = Instant::now();
let request_type = match &request.kind {
protocol::RequestKind::UpdateConfigRequest(_) => "update_config",
protocol::RequestKind::PrepareRequest(_) => "prepare",
protocol::RequestKind::PreAcceptRequest(_) => "pre_accept",
protocol::RequestKind::AcceptRequest(_) => "accept",
protocol::RequestKind::CommitRequest(_) => "commit",
protocol::RequestKind::DownloadInstancesRequest(_) => "download_instances",
protocol::RequestKind::HealthCheckRequest => "health_check",
protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(_) => {
"coordinator_update_replica_status"
}
protocol::RequestKind::BeginLearningRequest(_) => "begin_learning",
protocol::RequestKind::KvGetRequest(_) => "kv_get",
protocol::RequestKind::KvPurgeRequest(_) => "kv_purge",
};
let res = message_request_inner(ctx, request).await;

metrics::REQUEST_DURATION
.with_label_values(&[request_type])
.observe(start.elapsed().as_secs_f64());
metrics::REQUESTS_TOTAL
.with_label_values(&[request_type, if res.is_ok() { "ok" } else { "err" }])
.inc();

res
}

#[tracing::instrument(skip_all)]
async fn message_request_inner(
ctx: &ApiCtx,
request: protocol::Request,
) -> Result<protocol::Response> {
let current_replica_id = ctx.config().epoxy_replica_id();

Expand Down
11 changes: 10 additions & 1 deletion engine/packages/epoxy/src/replica/messages/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Result, ensure};
use epoxy_protocol::protocol;
use universaldb::Transaction;

use crate::replica::ballot;
use crate::{metrics, replica::ballot};

#[tracing::instrument(skip_all)]
pub async fn accept(
Expand All @@ -24,6 +24,13 @@ pub async fn accept(
let validation =
ballot::validate_and_update_ballot_for_instance(tx, replica_id, &current_ballot, &instance)
.await?;

if !validation.is_valid {
metrics::ACCEPT_TOTAL
.with_label_values(&["invalid_ballot"])
.inc();
}

ensure!(
validation.is_valid,
"ballot validation failed for accept: incoming ballot {:?} is not greater than stored ballot {:?} for instance {:?} (comparison: {:?})",
Expand All @@ -43,6 +50,8 @@ pub async fn accept(
};
crate::replica::update_log(tx, replica_id, log_entry, &instance).await?;

metrics::ACCEPT_TOTAL.with_label_values(&["ok"]).inc();

// EPaxos Step 19
Ok(protocol::AcceptResponse {
payload: protocol::AcceptOKPayload { proposal, instance },
Expand Down
6 changes: 5 additions & 1 deletion engine/packages/epoxy/src/replica/messages/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Result;
use epoxy_protocol::protocol;
use universaldb::Transaction;

use crate::replica::ballot;
use crate::{metrics, replica::ballot};

// EPaxos Step 24
#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -40,6 +40,10 @@ pub async fn commit(
None
};

metrics::COMMIT_TOTAL
.with_label_values(&[if cmd_err.is_none() { "ok" } else { "cmd_err" }])
.inc();

tracing::debug!(?replica_id, ?instance, ?cmd_err, "committed");

Ok(())
Expand Down
Loading
Loading