Skip to content

Commit 08f2cb5

Browse files
committed
chore(engine): add support for epoxy debugging
1 parent 164c0eb commit 08f2cb5

File tree

10 files changed

+652
-13
lines changed

10 files changed

+652
-13
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-peer/Cargo.toml

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,25 @@ license.workspace = true
99
anyhow.workspace = true
1010
axum.workspace = true
1111
base64.workspace = true
12-
gas.workspace = true
1312
epoxy.workspace = true
14-
futures-util.workspace = true
13+
epoxy-protocol.workspace = true
14+
gas.workspace = true
15+
indexmap.workspace = true
16+
namespace.workspace = true
17+
pegboard.workspace = true
18+
pegboard-actor-kv.workspace = true
1519
rivet-api-builder.workspace = true
1620
rivet-api-types.workspace = true
1721
rivet-api-util.workspace = true
1822
rivet-config.workspace = true
1923
rivet-error.workspace = true
2024
rivet-pools.workspace = true
21-
rivet-util.workspace = true
2225
rivet-types.workspace = true
26+
rivet-util.workspace = true
2327
serde.workspace = true
2428
serde_json.workspace = true
25-
indexmap.workspace = true
26-
27-
namespace.workspace = true
28-
pegboard.workspace = true
29-
pegboard-actor-kv.workspace = true
3029
tokio.workspace = true
3130
tracing.workspace = true
3231
universalpubsub.workspace = true
33-
uuid.workspace = true
3432
utoipa.workspace = true
33+
uuid.workspace = true

engine/packages/api-peer/src/internal.rs

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use anyhow::*;
2+
use epoxy_protocol::protocol::{ReplicaId, SlotId};
23
use gas::prelude::*;
4+
use indexmap::IndexMap;
35
use rivet_api_builder::ApiCtx;
46
use serde::{Deserialize, Serialize};
57
use universalpubsub::PublishOpts;
@@ -167,3 +169,312 @@ pub async fn set_epoxy_state(
167169

168170
Ok(SetEpoxyStateResponse {})
169171
}
172+
173+
#[derive(Serialize, Deserialize)]
174+
pub struct GetEpoxyReplicaDebugResponse {
175+
pub config: epoxy::types::ClusterConfig,
176+
pub computed: EpoxyReplicaDebugComputed,
177+
pub state: EpoxyReplicaDebugState,
178+
}
179+
180+
#[derive(Serialize, Deserialize)]
181+
pub struct EpoxyReplicaDebugComputed {
182+
pub this_replica_id: ReplicaId,
183+
pub this_replica_status: Option<epoxy::types::ReplicaStatus>,
184+
pub is_coordinator: bool,
185+
pub quorum_members: Vec<ReplicaId>,
186+
pub quorum_sizes: EpoxyQuorumSizes,
187+
pub replica_counts: EpoxyReplicaCounts,
188+
}
189+
190+
#[derive(Serialize, Deserialize)]
191+
pub struct EpoxyQuorumSizes {
192+
pub fast: usize,
193+
pub slow: usize,
194+
pub all: usize,
195+
pub any: usize,
196+
}
197+
198+
#[derive(Serialize, Deserialize)]
199+
pub struct EpoxyReplicaCounts {
200+
pub active: usize,
201+
pub learning: usize,
202+
pub joining: usize,
203+
}
204+
205+
#[derive(Serialize, Deserialize)]
206+
pub struct EpoxyReplicaDebugState {
207+
pub ballot: EpoxyBallot,
208+
pub instance_number: u64,
209+
}
210+
211+
#[derive(Serialize, Deserialize, Clone)]
212+
pub struct EpoxyBallot {
213+
pub epoch: u64,
214+
pub ballot: u64,
215+
pub replica_id: ReplicaId,
216+
}
217+
218+
/// Returns debug information for this epoxy replica including cluster config, computed values, and state.
219+
///
220+
/// This reads from the replica's local UDB state, which may differ from the coordinator's
221+
/// view if the replica hasn't been reconfigured yet.
222+
pub async fn get_epoxy_replica_debug(
223+
ctx: ApiCtx,
224+
_path: (),
225+
_query: (),
226+
) -> Result<GetEpoxyReplicaDebugResponse> {
227+
let replica_id = ctx.config().epoxy_replica_id();
228+
229+
let (config, ballot, instance_number) = ctx
230+
.udb()?
231+
.run(|tx| async move {
232+
let config = epoxy::utils::read_config(&tx, replica_id).await?;
233+
let ballot = epoxy::replica::ballot::get_ballot(&tx, replica_id).await?;
234+
let instance_number = epoxy::utils::read_instance_number(&tx, replica_id).await?;
235+
Result::Ok((config, ballot, instance_number))
236+
})
237+
.await?;
238+
239+
// Compute derived values from config
240+
let this_replica_status = config
241+
.replicas
242+
.iter()
243+
.find(|r| r.replica_id == replica_id)
244+
.map(|r| epoxy::types::ReplicaStatus::from(r.status.clone()));
245+
let is_coordinator = config.coordinator_replica_id == replica_id;
246+
247+
let quorum_members = epoxy::utils::get_quorum_members(&config);
248+
let quorum_member_count = quorum_members.len();
249+
250+
let quorum_sizes = EpoxyQuorumSizes {
251+
fast: epoxy::utils::calculate_quorum(quorum_member_count, epoxy::utils::QuorumType::Fast),
252+
slow: epoxy::utils::calculate_quorum(quorum_member_count, epoxy::utils::QuorumType::Slow),
253+
all: epoxy::utils::calculate_quorum(quorum_member_count, epoxy::utils::QuorumType::All),
254+
any: epoxy::utils::calculate_quorum(quorum_member_count, epoxy::utils::QuorumType::Any),
255+
};
256+
257+
let replica_counts = EpoxyReplicaCounts {
258+
active: config
259+
.replicas
260+
.iter()
261+
.filter(|r| matches!(r.status, epoxy_protocol::protocol::ReplicaStatus::Active))
262+
.count(),
263+
learning: config
264+
.replicas
265+
.iter()
266+
.filter(|r| matches!(r.status, epoxy_protocol::protocol::ReplicaStatus::Learning))
267+
.count(),
268+
joining: config
269+
.replicas
270+
.iter()
271+
.filter(|r| matches!(r.status, epoxy_protocol::protocol::ReplicaStatus::Joining))
272+
.count(),
273+
};
274+
275+
Ok(GetEpoxyReplicaDebugResponse {
276+
config: config.into(),
277+
computed: EpoxyReplicaDebugComputed {
278+
this_replica_id: replica_id,
279+
this_replica_status,
280+
is_coordinator,
281+
quorum_members,
282+
quorum_sizes,
283+
replica_counts,
284+
},
285+
state: EpoxyReplicaDebugState {
286+
ballot: EpoxyBallot {
287+
epoch: ballot.epoch,
288+
ballot: ballot.ballot,
289+
replica_id: ballot.replica_id,
290+
},
291+
instance_number,
292+
},
293+
})
294+
}
295+
296+
// MARK: Key debug
297+
#[derive(Serialize, Deserialize, Clone)]
298+
pub struct EpoxyKeyDebugPath {
299+
pub key: String,
300+
}
301+
302+
#[derive(Serialize, Deserialize)]
303+
pub struct GetEpoxyKeyDebugResponse {
304+
pub replica_id: ReplicaId,
305+
pub key: String,
306+
pub instances: Vec<EpoxyKeyInstance>,
307+
pub instances_by_status: IndexMap<String, usize>,
308+
}
309+
310+
#[derive(Serialize, Deserialize, Clone)]
311+
pub struct EpoxyKeyInstance {
312+
pub replica_id: ReplicaId,
313+
pub slot_id: SlotId,
314+
pub log_entry: Option<EpoxyKeyLogEntry>,
315+
}
316+
317+
#[derive(Serialize, Deserialize, Clone)]
318+
pub struct EpoxyKeyLogEntry {
319+
pub status: String,
320+
pub ballot: EpoxyBallot,
321+
pub seq: u64,
322+
pub deps: Vec<EpoxyInstance>,
323+
}
324+
325+
#[derive(Serialize, Deserialize, Clone)]
326+
pub struct EpoxyInstance {
327+
pub replica_id: ReplicaId,
328+
pub slot_id: SlotId,
329+
}
330+
331+
/// Returns debug information for a specific key on this replica.
332+
///
333+
/// Shows all instances that have touched this key and their log entry states.
334+
pub async fn get_epoxy_key_debug(
335+
ctx: ApiCtx,
336+
path: EpoxyKeyDebugPath,
337+
_query: (),
338+
) -> Result<GetEpoxyKeyDebugResponse> {
339+
let replica_id = ctx.config().epoxy_replica_id();
340+
341+
// Decode key from base64
342+
let key_bytes = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &path.key)
343+
.context("invalid base64 key")?;
344+
345+
let instances = ctx
346+
.udb()?
347+
.run(|tx| {
348+
let key_bytes = key_bytes.clone();
349+
async move {
350+
let protocol_instances =
351+
epoxy::utils::read_key_instances(&tx, replica_id, key_bytes).await?;
352+
353+
let mut instances = Vec::new();
354+
for instance in protocol_instances {
355+
let log_entry =
356+
epoxy::utils::read_log_entry(&tx, replica_id, &instance).await?;
357+
instances.push(EpoxyKeyInstance {
358+
replica_id: instance.replica_id,
359+
slot_id: instance.slot_id,
360+
log_entry: log_entry.map(|entry| EpoxyKeyLogEntry {
361+
status: format!("{:?}", entry.state),
362+
ballot: EpoxyBallot {
363+
epoch: entry.ballot.epoch,
364+
ballot: entry.ballot.ballot,
365+
replica_id: entry.ballot.replica_id,
366+
},
367+
seq: entry.seq,
368+
deps: entry
369+
.deps
370+
.into_iter()
371+
.map(|d| EpoxyInstance {
372+
replica_id: d.replica_id,
373+
slot_id: d.slot_id,
374+
})
375+
.collect(),
376+
}),
377+
});
378+
}
379+
380+
Result::Ok(instances)
381+
}
382+
})
383+
.await?;
384+
385+
// Compute instances by status
386+
let mut instances_by_status = IndexMap::new();
387+
for instance in &instances {
388+
let status = instance
389+
.log_entry
390+
.as_ref()
391+
.map(|e| e.status.clone())
392+
.unwrap_or_else(|| "Unknown".to_string());
393+
*instances_by_status.entry(status).or_insert(0) += 1;
394+
}
395+
396+
Ok(GetEpoxyKeyDebugResponse {
397+
replica_id,
398+
key: path.key,
399+
instances,
400+
instances_by_status,
401+
})
402+
}
403+
404+
#[derive(Serialize, Deserialize)]
405+
pub struct GetEpoxyKeyDebugFanoutResponse {
406+
pub replicas: IndexMap<ReplicaId, GetEpoxyKeyDebugResponse>,
407+
pub errors: IndexMap<ReplicaId, String>,
408+
}
409+
410+
/// Returns debug information for a specific key across all replicas in the cluster.
411+
///
412+
/// Fans out to all replicas and aggregates their responses.
413+
pub async fn get_epoxy_key_debug_fanout(
414+
ctx: ApiCtx,
415+
path: EpoxyKeyDebugPath,
416+
_query: (),
417+
) -> Result<GetEpoxyKeyDebugFanoutResponse> {
418+
let replica_id = ctx.config().epoxy_replica_id();
419+
420+
// Get cluster config to find all replicas
421+
let config: epoxy::types::ClusterConfig = ctx
422+
.udb()?
423+
.run(|tx| async move {
424+
let config = epoxy::utils::read_config(&tx, replica_id).await?;
425+
Result::Ok(config.into())
426+
})
427+
.await?;
428+
429+
// Get local response first
430+
let local_response = get_epoxy_key_debug(ctx.clone(), path.clone(), ()).await?;
431+
432+
let mut replicas = IndexMap::new();
433+
let mut errors = IndexMap::new();
434+
435+
// Add local response
436+
replicas.insert(replica_id, local_response);
437+
438+
// Fan out to other replicas
439+
let client = rivet_pools::reqwest::client().await?;
440+
441+
for replica_config in &config.replicas {
442+
if replica_config.replica_id == replica_id {
443+
continue;
444+
}
445+
446+
let url = format!(
447+
"{}/epoxy/replica/key/{}",
448+
replica_config.api_peer_url, path.key
449+
);
450+
451+
let response_result = client.get(&url).send().await;
452+
match response_result {
453+
std::result::Result::Ok(response) => {
454+
if response.status().is_success() {
455+
match response.json::<GetEpoxyKeyDebugResponse>().await {
456+
std::result::Result::Ok(resp) => {
457+
replicas.insert(replica_config.replica_id, resp);
458+
}
459+
std::result::Result::Err(e) => {
460+
errors.insert(
461+
replica_config.replica_id,
462+
format!("failed to parse response: {}", e),
463+
);
464+
}
465+
}
466+
} else {
467+
errors.insert(
468+
replica_config.replica_id,
469+
format!("request failed with status: {}", response.status()),
470+
);
471+
}
472+
}
473+
std::result::Result::Err(e) => {
474+
errors.insert(replica_config.replica_id, format!("request failed: {}", e));
475+
}
476+
}
477+
}
478+
479+
Ok(GetEpoxyKeyDebugFanoutResponse { replicas, errors })
480+
}

engine/packages/api-peer/src/router.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,18 @@ pub async fn router(
4242
)
4343
.route("/epoxy/coordinator/state", get(internal::get_epoxy_state))
4444
.route("/epoxy/coordinator/state", post(internal::set_epoxy_state))
45+
.route(
46+
"/epoxy/replica/debug",
47+
get(internal::get_epoxy_replica_debug),
48+
)
49+
.route(
50+
"/epoxy/replica/key/{key}",
51+
get(internal::get_epoxy_key_debug),
52+
)
53+
.route(
54+
"/epoxy/replica/key/{key}/fanout",
55+
get(internal::get_epoxy_key_debug_fanout),
56+
)
4557
.route("/debug/tracing/config", put(internal::set_tracing_config))
4658
})
4759
.await

engine/packages/engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ path = "src/main.rs"
1111

1212
[dependencies]
1313
anyhow.workspace = true
14+
base64.workspace = true
1415
chrono.workspace = true
1516
clap.workspace = true
1617
colored_json.workspace = true

0 commit comments

Comments
 (0)