@@ -17,7 +17,7 @@ use tokio::{
17
17
use tokio_util:: task:: TaskTracker ;
18
18
use tracing:: { debug, warn} ;
19
19
20
- use crate :: config:: SubmitterConfig ;
20
+ use crate :: { config:: SubmitterConfig , metrics :: BuilderMetrics } ;
21
21
22
22
const CACHE_SIZE : usize = 15_000 ;
23
23
const MAX_TASKS : usize = 1000 ;
@@ -29,6 +29,7 @@ pub struct Submitter {
29
29
handler : Handler ,
30
30
committees : Arc < AsyncMutex < CommitteeVec < 2 > > > ,
31
31
task_permits : Arc < Semaphore > ,
32
+ metrics : BuilderMetrics ,
32
33
}
33
34
34
35
impl Drop for Submitter {
@@ -38,7 +39,10 @@ impl Drop for Submitter {
38
39
}
39
40
40
41
impl Submitter {
41
- pub async fn create ( cfg : SubmitterConfig ) -> Self {
42
+ pub fn new < M > ( cfg : SubmitterConfig , metrics : & M ) -> Self
43
+ where
44
+ M : :: metrics:: Metrics ,
45
+ {
42
46
let client = robusta:: Client :: new ( cfg. robusta . clone ( ) ) ;
43
47
let verified = Arc :: new ( Mutex :: new ( BTreeSet :: new ( ) ) ) ;
44
48
let committees = Arc :: new ( AsyncMutex :: new ( CommitteeVec :: new ( cfg. committee . clone ( ) ) ) ) ;
@@ -62,6 +66,7 @@ impl Submitter {
62
66
submitters : TaskTracker :: new ( ) ,
63
67
committees,
64
68
task_permits : Arc :: new ( Semaphore :: new ( MAX_TASKS ) ) ,
69
+ metrics : BuilderMetrics :: new ( metrics) ,
65
70
}
66
71
}
67
72
@@ -77,14 +82,17 @@ impl Submitter {
77
82
let Ok ( permit) = Semaphore :: acquire_owned ( self . task_permits . clone ( ) ) . await else {
78
83
return ;
79
84
} ;
85
+ let num = cb. cert ( ) . data ( ) . num ( ) ;
80
86
debug ! (
81
87
node = %self . public_key( ) ,
82
- num = %cb . cert ( ) . data ( ) . num( ) ,
88
+ num = %num,
83
89
tasks = %self . submitters. len( ) ,
84
90
"creating block handler"
85
91
) ;
86
92
self . submitters
87
93
. spawn ( self . handler . clone ( ) . handle ( permit, cb) ) ;
94
+ self . metrics . block_submit . set ( * num as usize ) ;
95
+ self . metrics . submit_tasks . set ( self . submitters . len ( ) ) ;
88
96
}
89
97
90
98
pub async fn join ( self ) {
@@ -222,6 +230,7 @@ impl Handler {
222
230
#[ cfg( test) ]
223
231
mod tests {
224
232
use bytes:: Bytes ;
233
+ use metrics:: NoMetrics ;
225
234
use multisig:: { Committee , Keypair , PublicKey , Signed , VoteAccumulator } ;
226
235
use timeboost_types:: { Block , BlockInfo , BlockNumber , sailfish:: Round } ;
227
236
use tokio:: task:: JoinSet ;
@@ -296,7 +305,7 @@ mod tests {
296
305
. committee ( committee. clone ( ) )
297
306
. build ( ) ;
298
307
299
- let mut s = Submitter :: create ( scfg) . await ;
308
+ let mut s = Submitter :: new ( scfg, & NoMetrics ) ;
300
309
301
310
tasks. spawn ( async move {
302
311
for _ in 0 ..NODES {
0 commit comments