Skip to content

Commit c4782a5

Browse files
authored
Merge pull request #1081 from openmina/rpc/status/service_queues
Status Rpc: extend response to include current service queues
2 parents ad8588b + 1281ce2 commit c4782a5

File tree

18 files changed

+275
-85
lines changed

18 files changed

+275
-85
lines changed

core/src/channels.rs

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,116 @@
1-
pub use tokio::sync::{mpsc, oneshot};
1+
pub use tokio::sync::oneshot;
2+
3+
pub mod mpsc {
4+
use std::{
5+
sync::{Arc, Weak},
6+
task::{Context, Poll},
7+
};
8+
use tokio::sync::mpsc::error::*;
9+
pub use tokio::sync::mpsc::{self, *};
10+
11+
pub struct UnboundedSender<T>(mpsc::UnboundedSender<T>, Arc<()>);
12+
pub struct UnboundedReceiver<T>(mpsc::UnboundedReceiver<T>);
13+
14+
pub type TrackedUnboundedSender<T> = UnboundedSender<Tracked<T>>;
15+
pub type TrackedUnboundedReceiver<T> = UnboundedReceiver<Tracked<T>>;
16+
17+
#[allow(dead_code)]
18+
pub struct Tracked<T>(pub T, pub Tracker);
19+
#[allow(dead_code)]
20+
pub struct Tracker(Weak<()>);
21+
22+
impl<T> std::fmt::Debug for UnboundedSender<T> {
23+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24+
write!(f, "{:?} (len: {})", self.0, self.len())
25+
}
26+
}
27+
28+
impl<T> std::fmt::Debug for UnboundedReceiver<T> {
29+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30+
write!(f, "{:?} (len: {})", self.0, self.len())
31+
}
32+
}
33+
34+
impl<T> Clone for UnboundedSender<T> {
35+
fn clone(&self) -> Self {
36+
Self(self.0.clone(), self.1.clone())
37+
}
38+
}
39+
40+
impl<T> std::ops::Deref for Tracked<T> {
41+
type Target = T;
42+
43+
fn deref(&self) -> &Self::Target {
44+
&self.0
45+
}
46+
}
47+
48+
impl<T> std::ops::DerefMut for Tracked<T> {
49+
fn deref_mut(&mut self) -> &mut Self::Target {
50+
&mut self.0
51+
}
52+
}
53+
54+
impl<T> UnboundedSender<T> {
55+
pub fn is_empty(&self) -> bool {
56+
self.len() == 0
57+
}
58+
59+
pub fn len(&self) -> usize {
60+
Arc::weak_count(&self.1)
61+
}
62+
63+
pub fn send(&self, message: T) -> Result<(), SendError<T>> {
64+
self.0.send(message)
65+
}
66+
}
67+
68+
impl<T> TrackedUnboundedSender<T> {
69+
pub fn tracked_send(&self, message: T) -> Result<(), SendError<T>> {
70+
let msg = Tracked(message, Tracker(Arc::downgrade(&self.1)));
71+
self.send(msg).map_err(|err| SendError(err.0 .0))
72+
}
73+
}
74+
75+
impl<T> UnboundedReceiver<T> {
76+
pub fn is_empty(&self) -> bool {
77+
self.0.is_empty()
78+
}
79+
80+
pub fn len(&self) -> usize {
81+
self.0.len()
82+
}
83+
84+
pub async fn recv(&mut self) -> Option<T> {
85+
self.0.recv().await
86+
}
87+
88+
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
89+
self.0.try_recv()
90+
}
91+
92+
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
93+
self.0.poll_recv(cx)
94+
}
95+
96+
pub fn blocking_recv(&mut self) -> Option<T> {
97+
self.0.blocking_recv()
98+
}
99+
}
100+
101+
pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
102+
let (tx, rx) = mpsc::unbounded_channel();
103+
104+
(UnboundedSender(tx, Arc::new(())), UnboundedReceiver(rx))
105+
}
106+
107+
pub fn tracked_unbounded_channel<T>(
108+
) -> (UnboundedSender<Tracked<T>>, UnboundedReceiver<Tracked<T>>) {
109+
let (tx, rx) = mpsc::unbounded_channel();
110+
111+
(UnboundedSender(tx, Arc::new(())), UnboundedReceiver(rx))
112+
}
113+
}
2114

3115
pub mod broadcast {
4116
pub use tokio::sync::broadcast::*;

node/common/src/service/block_producer/mod.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use crate::EventSender;
2020
pub struct BlockProducerService {
2121
provers: Option<BlockProver>,
2222
keypair: AccountSecretKey,
23-
vrf_evaluation_sender: mpsc::UnboundedSender<VrfEvaluatorInput>,
24-
prove_sender: mpsc::UnboundedSender<(
23+
vrf_evaluation_sender: mpsc::TrackedUnboundedSender<VrfEvaluatorInput>,
24+
prove_sender: mpsc::TrackedUnboundedSender<(
2525
BlockProver,
2626
StateHash,
2727
Box<ProverExtendBlockchainInputStableV2>,
@@ -31,8 +31,8 @@ pub struct BlockProducerService {
3131
impl BlockProducerService {
3232
pub fn new(
3333
keypair: AccountSecretKey,
34-
vrf_evaluation_sender: mpsc::UnboundedSender<VrfEvaluatorInput>,
35-
prove_sender: mpsc::UnboundedSender<(
34+
vrf_evaluation_sender: mpsc::TrackedUnboundedSender<VrfEvaluatorInput>,
35+
prove_sender: mpsc::TrackedUnboundedSender<(
3636
BlockProver,
3737
StateHash,
3838
Box<ProverExtendBlockchainInputStableV2>,
@@ -80,18 +80,27 @@ impl BlockProducerService {
8080
pub fn keypair(&self) -> AccountSecretKey {
8181
self.keypair.clone()
8282
}
83+
84+
pub fn vrf_pending_requests(&self) -> usize {
85+
self.vrf_evaluation_sender.len()
86+
}
87+
88+
pub fn prove_pending_requests(&self) -> usize {
89+
self.prove_sender.len()
90+
}
8391
}
8492

8593
fn prover_loop(
8694
keypair: AccountSecretKey,
8795
event_sender: EventSender,
88-
mut rx: mpsc::UnboundedReceiver<(
96+
mut rx: mpsc::TrackedUnboundedReceiver<(
8997
BlockProver,
9098
StateHash,
9199
Box<ProverExtendBlockchainInputStableV2>,
92100
)>,
93101
) {
94-
while let Some((provers, block_hash, mut input)) = rx.blocking_recv() {
102+
while let Some(msg) = rx.blocking_recv() {
103+
let (provers, block_hash, mut input) = msg.0;
95104
let res = prove(provers, &mut input, &keypair, false).map_err(|err| format!("{err:?}"));
96105
if let Err(error) = &res {
97106
openmina_core::error!(message = "Block proof failed", error = format!("{error}"));
@@ -160,7 +169,7 @@ impl node::service::BlockProducerService for crate::NodeService {
160169
.as_ref()
161170
.expect("prove shouldn't be requested if block producer isn't initialized")
162171
.prove_sender
163-
.send((provers, block_hash, input));
172+
.tracked_send((provers, block_hash, input));
164173
}
165174

166175
fn with_producer_keypair<T>(&self, f: impl FnOnce(&AccountSecretKey) -> T) -> Option<T> {

node/common/src/service/block_producer/vrf_evaluator.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use node::{
55
vrf_evaluator::{VrfEvaluationOutputWithHash, VrfEvaluatorInput},
66
BlockProducerEvent,
77
},
8-
core::channels::mpsc::{UnboundedReceiver, UnboundedSender},
8+
core::channels::mpsc::{TrackedUnboundedReceiver, UnboundedSender},
99
event_source::Event,
1010
};
1111
use vrf::{VrfEvaluationInput, VrfEvaluationOutput};
@@ -14,7 +14,7 @@ use crate::NodeService;
1414

1515
pub fn vrf_evaluator(
1616
event_sender: UnboundedSender<Event>,
17-
mut vrf_evaluation_receiver: UnboundedReceiver<VrfEvaluatorInput>,
17+
mut vrf_evaluation_receiver: TrackedUnboundedReceiver<VrfEvaluatorInput>,
1818
keypair: Keypair,
1919
) {
2020
while let Some(vrf_evaluator_input) = vrf_evaluation_receiver.blocking_recv() {
@@ -28,7 +28,7 @@ pub fn vrf_evaluator(
2828
global_slot,
2929
total_currency,
3030
staking_ledger_hash: _,
31-
} = &vrf_evaluator_input;
31+
} = &*vrf_evaluator_input;
3232

3333
let vrf_result = delegator_table
3434
.iter()
@@ -73,7 +73,7 @@ impl node::block_producer_effectful::vrf_evaluator_effectful::BlockProducerVrfEv
7373
{
7474
fn evaluate(&mut self, data: VrfEvaluatorInput) {
7575
if let Some(bp) = self.block_producer.as_mut() {
76-
let _ = bp.vrf_evaluation_sender.send(data);
76+
let _ = bp.vrf_evaluation_sender.tracked_send(data);
7777
}
7878
}
7979
}

node/common/src/service/event_receiver.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@ pub struct EventReceiver {
99
}
1010

1111
impl EventReceiver {
12+
pub fn is_empty(&self) -> bool {
13+
!self.has_next()
14+
}
15+
16+
pub fn len(&self) -> usize {
17+
self.rx.len() + self.queue.len()
18+
}
19+
1220
/// If `Err(())`, `mpsc::Sender` for this channel was dropped.
1321
pub async fn wait_for_events(&mut self) -> Result<(), ()> {
1422
if !self.queue.is_empty() {
@@ -19,17 +27,8 @@ impl EventReceiver {
1927
Ok(())
2028
}
2129

22-
pub fn has_next(&mut self) -> bool {
23-
if self.queue.is_empty() {
24-
if let Some(event) = self.try_next() {
25-
self.queue.push(event);
26-
true
27-
} else {
28-
false
29-
}
30-
} else {
31-
true
32-
}
30+
pub fn has_next(&self) -> bool {
31+
!self.queue.is_empty() || !self.rx.is_empty()
3332
}
3433

3534
pub fn try_next(&mut self) -> Option<Event> {

node/common/src/service/p2p.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl webrtc::P2pServiceWebrtc for NodeService {
3232
self.event_sender()
3333
}
3434

35-
fn cmd_sender(&self) -> &mpsc::UnboundedSender<webrtc::Cmd> {
35+
fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<webrtc::Cmd> {
3636
&self.p2p.webrtc.cmd_sender
3737
}
3838

node/common/src/service/service.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub struct NodeService {
3838
pub event_sender: EventSender,
3939
pub event_receiver: EventReceiver,
4040

41-
pub snark_block_proof_verify: mpsc::UnboundedSender<SnarkBlockVerifyArgs>,
41+
pub snark_block_proof_verify: mpsc::TrackedUnboundedSender<SnarkBlockVerifyArgs>,
4242

4343
pub ledger_manager: LedgerManager,
4444
pub block_producer: Option<BlockProducerService>,
@@ -142,6 +142,26 @@ impl AsMut<NodeService> for NodeService {
142142
impl redux::Service for NodeService {}
143143

144144
impl node::Service for NodeService {
145+
fn queues(&mut self) -> node::service::Queues {
146+
node::service::Queues {
147+
events: self.event_receiver.len(),
148+
snark_block_verify: self.snark_block_proof_verify.len(),
149+
ledger: self.ledger_manager.pending_calls(),
150+
vrf_evaluator: self
151+
.block_producer
152+
.as_ref()
153+
.map(|v| v.vrf_pending_requests()),
154+
block_prover: self
155+
.block_producer
156+
.as_ref()
157+
.map(|v| v.prove_pending_requests()),
158+
p2p_webrtc: self.p2p.webrtc.pending_cmds(),
159+
#[cfg(feature = "p2p-libp2p")]
160+
p2p_libp2p: self.p2p.mio.pending_cmds(),
161+
rpc: self.rpc.req_receiver().len(),
162+
}
163+
}
164+
145165
fn stats(&mut self) -> Option<&mut Stats> {
146166
self.stats()
147167
}

node/common/src/service/snarks.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,18 @@ pub struct SnarkBlockVerifyArgs {
3737
impl NodeService {
3838
pub fn snark_block_proof_verifier_spawn(
3939
event_sender: EventSender,
40-
) -> mpsc::UnboundedSender<SnarkBlockVerifyArgs> {
41-
let (tx, mut rx) = mpsc::unbounded_channel();
40+
) -> mpsc::TrackedUnboundedSender<SnarkBlockVerifyArgs> {
41+
let (tx, mut rx) = mpsc::tracked_unbounded_channel();
4242
thread::Builder::new()
4343
.name("block_proof_verifier".to_owned())
4444
.spawn(move || {
45-
while let Some(SnarkBlockVerifyArgs {
46-
req_id,
47-
verifier_index,
48-
verifier_srs,
49-
block,
50-
}) = rx.blocking_recv()
51-
{
45+
while let Some(msg) = rx.blocking_recv() {
46+
let SnarkBlockVerifyArgs {
47+
req_id,
48+
verifier_index,
49+
verifier_srs,
50+
block,
51+
} = msg.0;
5252
eprintln!("verify({}) - start", block.hash_ref());
5353
let header = block.header_ref();
5454
let result = {
@@ -90,7 +90,7 @@ impl node::service::SnarkBlockVerifyService for NodeService {
9090
verifier_srs,
9191
block,
9292
};
93-
let _ = self.snark_block_proof_verify.send(args);
93+
let _ = self.snark_block_proof_verify.tracked_send(args);
9494
}
9595
}
9696

node/src/ledger/ledger_manager.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -350,17 +350,17 @@ pub struct LedgerManager {
350350
}
351351

352352
#[derive(Clone)]
353-
pub(super) struct LedgerCaller(mpsc::UnboundedSender<LedgerRequestWithChan>);
353+
pub(super) struct LedgerCaller(mpsc::TrackedUnboundedSender<LedgerRequestWithChan>);
354354

355355
impl LedgerManager {
356356
pub fn spawn(mut ledger_ctx: LedgerCtx) -> LedgerManager {
357-
let (sender, mut receiver) = mpsc::unbounded_channel();
357+
let (sender, mut receiver) = mpsc::tracked_unbounded_channel();
358358
let caller = LedgerCaller(sender);
359359
let ledger_caller = caller.clone();
360360

361361
let ledger_manager_loop = move || {
362-
while let Some(LedgerRequestWithChan { request, responder }) = receiver.blocking_recv()
363-
{
362+
while let Some(msg) = receiver.blocking_recv() {
363+
let LedgerRequestWithChan { request, responder } = msg.0;
364364
let response = request.handle(&mut ledger_ctx, &ledger_caller, responder.is_some());
365365
match (response, responder) {
366366
(LedgerResponse::Write(resp), None) => {
@@ -395,6 +395,10 @@ impl LedgerManager {
395395
}
396396
}
397397

398+
pub fn pending_calls(&self) -> usize {
399+
self.caller.0.len()
400+
}
401+
398402
pub(super) fn call(&self, request: LedgerRequest) {
399403
self.caller.call(request)
400404
}
@@ -458,7 +462,7 @@ impl LedgerManager {
458462
impl LedgerCaller {
459463
pub fn call(&self, request: LedgerRequest) {
460464
self.0
461-
.send(LedgerRequestWithChan {
465+
.tracked_send(LedgerRequestWithChan {
462466
request,
463467
responder: None,
464468
})
@@ -471,7 +475,7 @@ impl LedgerCaller {
471475
) -> Result<LedgerResponse, std::sync::mpsc::RecvError> {
472476
let (responder, receiver) = std::sync::mpsc::sync_channel(0);
473477
self.0
474-
.send(LedgerRequestWithChan {
478+
.tracked_send(LedgerRequestWithChan {
475479
request,
476480
responder: Some(responder),
477481
})

node/src/rpc/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use crate::ledger::write::LedgerWriteKind;
4848
use crate::p2p::connection::incoming::P2pConnectionIncomingInitOpts;
4949
use crate::p2p::connection::outgoing::P2pConnectionOutgoingInitOpts;
5050
use crate::p2p::PeerId;
51+
use crate::service::Queues;
5152
use crate::snark_pool::{JobCommitment, JobSummary};
5253
use crate::stats::actions::{ActionStatsForBlock, ActionStatsSnapshot};
5354
use crate::stats::block_producer::{
@@ -469,6 +470,7 @@ pub struct RpcNodeStatus {
469470
pub current_block_production_attempt: Option<BlockProductionAttempt>,
470471
pub peers: Vec<RpcPeerInfo>,
471472
pub resources_status: RpcNodeStatusResources,
473+
pub service_queues: Queues,
472474
}
473475

474476
#[derive(Serialize, Debug, Clone)]

0 commit comments

Comments
 (0)