Skip to content

Commit cb0fad3

Browse files
committed
fix(epoxy): add metrics
1 parent 830c32d commit cb0fad3

File tree

15 files changed

+262
-14
lines changed

15 files changed

+262
-14
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/epoxy/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ rivet-api-builder.workspace = true
1919
rivet-config.workspace = true
2020
rivet-error.workspace = true
2121
rivet-pools.workspace = true
22+
rivet-metrics.workspace = true
2223
rivet-util.workspace = true
2324
serde.workspace = true
2425
serde_bare.workspace = true

engine/packages/epoxy/src/http_client.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use rivet_api_builder::ApiCtx;
1010
use std::future::Future;
1111
use vbare::OwnedVersionedData;
1212

13-
use crate::utils;
13+
use crate::{metrics, utils};
1414

1515
/// Find the API replica URL for a given replica ID in the topology
1616
fn find_replica_address(
@@ -92,6 +92,17 @@ where
9292
}
9393
}
9494

95+
metrics::QUORUM_ATTEMPTS_TOTAL
96+
.with_label_values(&[
97+
quorum_type.to_string().as_str(),
98+
if successful_responses.len() == target_responses {
99+
"ok"
100+
} else {
101+
"insufficient_responses"
102+
},
103+
])
104+
.inc();
105+
95106
Ok(successful_responses)
96107
}
97108

engine/packages/epoxy/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub mod errors;
55
pub mod http_client;
66
pub mod http_routes;
77
pub mod keys;
8+
pub mod metrics;
89
pub mod ops;
910
pub mod replica;
1011
pub mod types;
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use rivet_metrics::{BUCKETS, REGISTRY, prometheus::*};
2+
3+
lazy_static::lazy_static! {
4+
// MARK: Consensus Operations
5+
pub static ref PROPOSALS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
6+
"epoxy_proposals_total",
7+
"Total number of proposals.",
8+
&["status"],
9+
*REGISTRY
10+
).unwrap();
11+
12+
pub static ref PRE_ACCEPT_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
13+
"epoxy_pre_accept_total",
14+
"Total number of pre-accept operations.",
15+
&["result"],
16+
*REGISTRY
17+
).unwrap();
18+
19+
pub static ref ACCEPT_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
20+
"epoxy_accept_total",
21+
"Total number of accept operations.",
22+
&["result"],
23+
*REGISTRY
24+
).unwrap();
25+
26+
pub static ref COMMIT_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
27+
"epoxy_commit_total",
28+
"Total number of commit operations.",
29+
&["result"],
30+
*REGISTRY
31+
).unwrap();
32+
33+
pub static ref PROPOSAL_DURATION: Histogram = register_histogram_with_registry!(
34+
"epoxy_proposal_duration",
35+
"Duration from propose to commit in seconds.",
36+
BUCKETS.to_vec(),
37+
*REGISTRY
38+
).unwrap();
39+
40+
// MARK: Message Handling
41+
pub static ref REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
42+
"epoxy_requests_total",
43+
"Total number of requests.",
44+
&["request_type", "result"],
45+
*REGISTRY
46+
).unwrap();
47+
48+
pub static ref REQUEST_DURATION: HistogramVec = register_histogram_vec_with_registry!(
49+
"epoxy_request_duration",
50+
"Duration of request handling in seconds.",
51+
&["request_type"],
52+
BUCKETS.to_vec(),
53+
*REGISTRY
54+
).unwrap();
55+
56+
// MARK: Quorum
57+
pub static ref QUORUM_ATTEMPTS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
58+
"epoxy_quorum_attempts_total",
59+
"Total number of quorum attempts.",
60+
&["quorum_type", "result"],
61+
*REGISTRY
62+
).unwrap();
63+
64+
pub static ref QUORUM_SIZE: IntGaugeVec = register_int_gauge_vec_with_registry!(
65+
"epoxy_quorum_size",
66+
"Current quorum size.",
67+
&["quorum_type"],
68+
*REGISTRY
69+
).unwrap();
70+
71+
// MARK: Replica State
72+
pub static ref BALLOT_EPOCH: IntGauge = register_int_gauge_with_registry!(
73+
"epoxy_ballot_epoch",
74+
"Current ballot epoch.",
75+
*REGISTRY
76+
).unwrap();
77+
78+
pub static ref BALLOT_NUMBER: IntGauge = register_int_gauge_with_registry!(
79+
"epoxy_ballot_number",
80+
"Current ballot number.",
81+
*REGISTRY
82+
).unwrap();
83+
84+
pub static ref INSTANCE_NUMBER: IntGauge = register_int_gauge_with_registry!(
85+
"epoxy_instance_number",
86+
"Current instance/slot number.",
87+
*REGISTRY
88+
).unwrap();
89+
90+
// MARK: Cluster Health
91+
pub static ref REPLICAS_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!(
92+
"epoxy_replicas_total",
93+
"Total number of replicas.",
94+
&["status"],
95+
*REGISTRY
96+
).unwrap();
97+
98+
// MARK: Errors
99+
pub static ref BALLOT_REJECTIONS_TOTAL: IntCounter = register_int_counter_with_registry!(
100+
"epoxy_ballot_rejections_total",
101+
"Total number of ballot rejections due to stale ballot.",
102+
*REGISTRY
103+
).unwrap();
104+
105+
pub static ref INTERFERENCE_DETECTED_TOTAL: IntCounter = register_int_counter_with_registry!(
106+
"epoxy_interference_detected_total",
107+
"Total number of key conflict interferences detected.",
108+
*REGISTRY
109+
).unwrap();
110+
}

engine/packages/epoxy/src/ops/propose.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ use base64::engine::general_purpose::STANDARD as BASE64;
44
use epoxy_protocol::protocol::{self, Path, Payload, ReplicaId};
55
use gas::prelude::*;
66
use rivet_api_builder::prelude::*;
7+
use std::time::Instant;
78

8-
use crate::{http_client, replica, utils};
9+
use crate::{http_client, metrics, replica, utils};
910

1011
#[derive(Debug, Serialize, Deserialize)]
1112
pub enum ProposalResult {
@@ -31,6 +32,7 @@ pub struct Input {
3132

3233
#[operation]
3334
pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<ProposalResult> {
35+
let start = Instant::now();
3436
let replica_id = ctx.config().epoxy_replica_id();
3537

3638
// Read config
@@ -71,9 +73,9 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
7173
.await
7274
.context("failed deciding path")?;
7375

74-
match path {
76+
let res = match path {
7577
Path::PathFast(protocol::PathFast { payload }) => {
76-
commit(ctx, &config, replica_id, payload, input.purge_cache).await
78+
commit(ctx, &config, replica_id, payload, input.purge_cache).await?
7779
}
7880
Path::PathSlow(protocol::PathSlow { payload }) => {
7981
run_paxos_accept(
@@ -84,9 +86,20 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
8486
payload,
8587
input.purge_cache,
8688
)
87-
.await
89+
.await?
8890
}
89-
}
91+
};
92+
93+
metrics::PROPOSAL_DURATION.observe(start.elapsed().as_secs_f64());
94+
metrics::PROPOSALS_TOTAL
95+
.with_label_values(&[if let ProposalResult::Committed = res {
96+
"ok"
97+
} else {
98+
"err"
99+
}])
100+
.inc();
101+
102+
Ok(res)
90103
}
91104

92105
#[tracing::instrument(skip_all)]

engine/packages/epoxy/src/replica/ballot.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use epoxy_protocol::protocol;
33
use universaldb::Transaction;
44
use universaldb::utils::{FormalKey, IsolationLevel::*};
55

6-
use crate::keys;
6+
use crate::{keys, metrics};
77

88
/// Get the current ballot for this replica
99
#[tracing::instrument(skip_all)]
@@ -50,6 +50,9 @@ pub async fn increment_ballot(
5050

5151
tx.set(&packed_key, &serialized);
5252

53+
metrics::BALLOT_EPOCH.set(current_ballot.epoch as i64);
54+
metrics::BALLOT_NUMBER.set(current_ballot.ballot as i64);
55+
5356
Ok(current_ballot)
5457
}
5558

@@ -121,6 +124,10 @@ pub async fn validate_and_update_ballot_for_instance(
121124
tracing::debug!(?ballot, ?instance, "updated highest ballot for instance");
122125
}
123126

127+
if !is_valid {
128+
metrics::BALLOT_REJECTIONS_TOTAL.inc();
129+
}
130+
124131
Ok(BallotValidationResult {
125132
is_valid,
126133
incoming_ballot: ballot.clone(),

engine/packages/epoxy/src/replica/lead_consensus.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use epoxy_protocol::protocol;
33
use universaldb::Transaction;
44
use universaldb::utils::{FormalKey, IsolationLevel::*};
55

6-
use crate::keys;
76
use crate::replica::{ballot, utils};
7+
use crate::{keys, metrics};
88

99
#[tracing::instrument(skip_all)]
1010
pub async fn lead_consensus(
@@ -55,6 +55,8 @@ pub async fn lead_consensus(
5555
};
5656
crate::replica::update_log(tx, replica_id, log_entry, &instance).await?;
5757

58+
metrics::INSTANCE_NUMBER.set(instance.slot_id as i64);
59+
5860
// Return payload
5961
Ok(protocol::Payload {
6062
proposal,

engine/packages/epoxy/src/replica/message_request.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,47 @@ use anyhow::*;
22
use epoxy_protocol::protocol::{self};
33
use gas::prelude::*;
44
use rivet_api_builder::prelude::*;
5+
use std::time::Instant;
56

6-
use crate::{ops, replica};
7+
use crate::{metrics, ops, replica};
78

89
#[tracing::instrument(skip_all)]
910
pub async fn message_request(
1011
ctx: &ApiCtx,
1112
request: protocol::Request,
13+
) -> Result<protocol::Response> {
14+
let start = Instant::now();
15+
let request_type = match &request.kind {
16+
protocol::RequestKind::UpdateConfigRequest(_) => "update_config",
17+
protocol::RequestKind::PrepareRequest(_) => "prepare",
18+
protocol::RequestKind::PreAcceptRequest(_) => "pre_accept",
19+
protocol::RequestKind::AcceptRequest(_) => "accept",
20+
protocol::RequestKind::CommitRequest(_) => "commit",
21+
protocol::RequestKind::DownloadInstancesRequest(_) => "download_instances",
22+
protocol::RequestKind::HealthCheckRequest => "health_check",
23+
protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(_) => {
24+
"coordinator_update_replica_status"
25+
}
26+
protocol::RequestKind::BeginLearningRequest(_) => "begin_learning",
27+
protocol::RequestKind::KvGetRequest(_) => "kv_get",
28+
protocol::RequestKind::KvPurgeRequest(_) => "kv_purge",
29+
};
30+
let res = message_request_inner(ctx, request).await;
31+
32+
metrics::REQUEST_DURATION
33+
.with_label_values(&[request_type])
34+
.observe(start.elapsed().as_secs_f64());
35+
metrics::REQUESTS_TOTAL
36+
.with_label_values(&[request_type, if res.is_ok() { "ok" } else { "err" }])
37+
.inc();
38+
39+
res
40+
}
41+
42+
#[tracing::instrument(skip_all)]
43+
async fn message_request_inner(
44+
ctx: &ApiCtx,
45+
request: protocol::Request,
1246
) -> Result<protocol::Response> {
1347
let current_replica_id = ctx.config().epoxy_replica_id();
1448

engine/packages/epoxy/src/replica/messages/accept.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{Result, ensure};
22
use epoxy_protocol::protocol;
33
use universaldb::Transaction;
44

5-
use crate::replica::ballot;
5+
use crate::{metrics, replica::ballot};
66

77
#[tracing::instrument(skip_all)]
88
pub async fn accept(
@@ -24,6 +24,13 @@ pub async fn accept(
2424
let validation =
2525
ballot::validate_and_update_ballot_for_instance(tx, replica_id, &current_ballot, &instance)
2626
.await?;
27+
28+
if !validation.is_valid {
29+
metrics::ACCEPT_TOTAL
30+
.with_label_values(&["invalid_ballot"])
31+
.inc();
32+
}
33+
2734
ensure!(
2835
validation.is_valid,
2936
"ballot validation failed for accept: incoming ballot {:?} is not greater than stored ballot {:?} for instance {:?} (comparison: {:?})",
@@ -43,6 +50,8 @@ pub async fn accept(
4350
};
4451
crate::replica::update_log(tx, replica_id, log_entry, &instance).await?;
4552

53+
metrics::ACCEPT_TOTAL.with_label_values(&["ok"]).inc();
54+
4655
// EPaxos Step 19
4756
Ok(protocol::AcceptResponse {
4857
payload: protocol::AcceptOKPayload { proposal, instance },

0 commit comments

Comments
 (0)