Skip to content

Commit 02fd30c

Browse files
feat: introduce new worker performance metrics (#95)
1 parent 9547c66 commit 02fd30c

File tree

3 files changed

+133
-17
lines changed

3 files changed

+133
-17
lines changed

balius-runtime/src/lib.rs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use ledgers::LedgerHost;
55
use logging::LoggerHost;
66
use router::Router;
77
use sign::SignerHost;
8-
use std::{collections::HashMap, io::Read, path::Path, sync::Arc};
8+
use std::{collections::HashMap, io::Read, path::Path, sync::Arc, time::Instant};
99
use thiserror::Error;
1010
use tokio::sync::{Mutex, RwLock};
1111
use tracing::{debug, info, warn};
@@ -574,7 +574,8 @@ impl Runtime {
574574
let cursor = self.store.get_worker_cursor(id)?;
575575
debug!(cursor, id, "found cursor for worker");
576576

577-
self.loaded.write().await.insert(
577+
let mut loaded = self.loaded.write().await;
578+
loaded.insert(
578579
id.to_owned(),
579580
Mutex::new(LoadedWorker {
580581
wasm_store,
@@ -584,6 +585,8 @@ impl Runtime {
584585
}),
585586
);
586587

588+
self.metrics.workers_loaded(loaded.len() as u64);
589+
587590
Ok(())
588591
}
589592

@@ -615,7 +618,12 @@ impl Runtime {
615618
}
616619

617620
pub async fn remove_worker(&self, id: &str) -> Result<(), Error> {
618-
match self.loaded.write().await.remove(id) {
621+
let mut loaded = self.loaded.write().await;
622+
let removed = loaded.remove(id);
623+
624+
self.metrics.workers_loaded(loaded.len() as u64);
625+
626+
match removed {
619627
Some(_) => {
620628
info!(worker = id, "Successfully removed worker from runtime.")
621629
}
@@ -632,6 +640,7 @@ impl Runtime {
632640
undo_blocks: &Vec<Block>,
633641
next_block: &Block,
634642
) -> Result<(), Error> {
643+
let start = Instant::now();
635644
info!("applying block");
636645

637646
let log_seq = self.store.write_ahead(undo_blocks, next_block)?;
@@ -640,23 +649,35 @@ impl Runtime {
640649

641650
let mut store_update = self.store.start_atomic_update(log_seq)?;
642651

643-
let update = async |worker: &Mutex<LoadedWorker>| -> Result<String, Error> {
652+
let update = async |worker: &Mutex<LoadedWorker>| -> Result<(String, f64), Error> {
653+
let worker_start = Instant::now();
644654
let mut lock = worker.lock().await;
645655
lock.apply_chain(undo_blocks, next_block).await?;
646656

647-
Ok(lock.wasm_store.data().worker_id.clone())
657+
Ok((
658+
lock.wasm_store.data().worker_id.clone(),
659+
worker_start.elapsed().as_secs_f64() * 1000.0,
660+
))
648661
};
649662
let updates = workers.values().map(update).collect_vec();
650663

651664
join_all(updates)
652665
.await
653666
.into_iter()
654-
.collect::<Result<Vec<String>, _>>()?
667+
.collect::<Result<Vec<(String, f64)>, _>>()?
655668
.iter()
656-
.try_for_each(|x| store_update.update_worker_cursor(x))?;
669+
.try_for_each(|(x, duration)| {
670+
self.metrics.handle_worker_chain_duration_ms(x, *duration);
671+
store_update.update_worker_cursor(x)
672+
})?;
657673

658674
store_update.commit()?;
659675

676+
self.metrics
677+
.handle_chain_duration_ms(start.elapsed().as_secs_f64() * 1000.0);
678+
self.metrics.latest_block_height(next_block.height());
679+
self.metrics.latest_block_slot(next_block.slot());
680+
660681
Ok(())
661682
}
662683

@@ -666,6 +687,7 @@ impl Runtime {
666687
method: &str,
667688
params: Vec<u8>,
668689
) -> Result<wit::Response, Error> {
690+
let start = Instant::now();
669691
let workers = self.loaded.read().await;
670692
let mut worker = workers
671693
.get(worker_id)
@@ -683,6 +705,9 @@ impl Runtime {
683705

684706
let result = worker.dispatch_event(channel, &evt).await;
685707
self.metrics.request(worker_id, method, result.is_ok());
708+
let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
709+
self.metrics
710+
.handle_request_duration_ms(worker_id, method, duration_ms);
686711
result
687712
}
688713
}
@@ -818,8 +843,11 @@ impl RuntimeBuilder {
818843
http,
819844
} = this;
820845

846+
let metrics: Arc<metrics::Metrics> = Default::default();
847+
metrics.workers_loaded(0);
848+
821849
Ok(Runtime {
822-
metrics: Default::default(),
850+
metrics,
823851
loaded: Default::default(),
824852
engine,
825853
linker,

balius-runtime/src/metrics.rs

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use opentelemetry::{global, metrics::Counter, KeyValue};
1+
use opentelemetry::{
2+
global,
3+
metrics::{Counter, Gauge, Histogram},
4+
KeyValue,
5+
};
26

37
use crate::{logging::level_to_string, wit::balius::app::logging::Level};
48

@@ -17,6 +21,12 @@ pub struct Metrics {
1721
ledger_read_utxos: Counter<u64>,
1822
ledger_search_utxos: Counter<u64>,
1923
ledger_read_params: Counter<u64>,
24+
workers_loaded: Gauge<u64>,
25+
handle_chain_duration_ms: Histogram<f64>,
26+
handle_request_duration_ms: Histogram<f64>,
27+
handle_worker_chain_duration_ms: Histogram<f64>,
28+
latest_block_height: Gauge<u64>,
29+
latest_block_slot: Gauge<u64>,
2030
}
2131

2232
impl Metrics {
@@ -88,6 +98,48 @@ impl Metrics {
8898
.with_description("Amount of calls to read_params on the ledger interface.")
8999
.build();
90100

101+
let workers_loaded = meter
102+
.u64_gauge("workers_loaded")
103+
.with_description("Current amount of workers loaded into the runtime.")
104+
.build();
105+
106+
let handle_chain_duration_ms = meter
107+
.f64_histogram("handle_chain_duration_ms")
108+
.with_description("Duration to process handle_chain in milliseconds.")
109+
.with_unit("ms")
110+
.with_boundaries(vec![
111+
100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 60000.0, 1200000.0,
112+
])
113+
.build();
114+
115+
let handle_request_duration_ms = meter
116+
.f64_histogram("handle_request_duration_ms")
117+
.with_description("Duration to process handle_request in milliseconds.")
118+
.with_unit("ms")
119+
.with_boundaries(vec![
120+
100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 60000.0, 1200000.0,
121+
])
122+
.build();
123+
124+
let handle_worker_chain_duration_ms = meter
125+
.f64_histogram("handle_worker_chain_duration_ms")
126+
.with_description("Duration for a worker to process apply_chain in milliseconds.")
127+
.with_unit("ms")
128+
.with_boundaries(vec![
129+
100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 60000.0, 1200000.0,
130+
])
131+
.build();
132+
133+
let latest_block_height = meter
134+
.u64_gauge("latest_block_height")
135+
.with_description("Latest block height successfully processed by handle_chain.")
136+
.build();
137+
138+
let latest_block_slot = meter
139+
.u64_gauge("latest_block_slot")
140+
.with_description("Latest block slot successfully processed by handle_chain.")
141+
.build();
142+
91143
Metrics {
92144
requests,
93145
kv_get,
@@ -102,6 +154,12 @@ impl Metrics {
102154
ledger_read_utxos,
103155
ledger_search_utxos,
104156
ledger_read_params,
157+
workers_loaded,
158+
handle_chain_duration_ms,
159+
handle_request_duration_ms,
160+
handle_worker_chain_duration_ms,
161+
latest_block_height,
162+
latest_block_slot,
105163
}
106164
}
107165

@@ -180,6 +238,39 @@ impl Metrics {
180238
self.ledger_read_params
181239
.add(1, &[KeyValue::new("worker", worker_id.to_owned())]);
182240
}
241+
242+
pub fn workers_loaded(&self, count: u64) {
243+
self.workers_loaded.record(count, &[]);
244+
}
245+
246+
pub fn handle_chain_duration_ms(&self, duration_ms: f64) {
247+
self.handle_chain_duration_ms.record(duration_ms, &[]);
248+
}
249+
250+
pub fn handle_request_duration_ms(&self, worker_id: &str, method: &str, duration_ms: f64) {
251+
self.handle_request_duration_ms.record(
252+
duration_ms,
253+
&[
254+
KeyValue::new("worker", worker_id.to_owned()),
255+
KeyValue::new("method", method.to_owned()),
256+
],
257+
);
258+
}
259+
260+
pub fn latest_block_height(&self, height: u64) {
261+
self.latest_block_height.record(height, &[]);
262+
}
263+
264+
pub fn latest_block_slot(&self, slot: u64) {
265+
self.latest_block_slot.record(slot, &[]);
266+
}
267+
268+
pub fn handle_worker_chain_duration_ms(&self, worker_id: &str, duration_ms: f64) {
269+
self.handle_worker_chain_duration_ms.record(
270+
duration_ms,
271+
&[KeyValue::new("worker", worker_id.to_owned())],
272+
);
273+
}
183274
}
184275

185276
impl Default for Metrics {

examples/asteria-tracker/src/lib.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,12 @@ fn handle_utxo(config: sdk::Config<Config>, utxo: sdk::Utxo<Datum>) -> sdk::Work
8787
if let Some(datum) = utxo.utxo.datum {
8888
let p = datum.payload.unwrap().plutus_data.unwrap();
8989

90-
match p {
91-
plutus_data::PlutusData::Constr(x) => {
92-
let mut f = x.fields.iter();
90+
if let plutus_data::PlutusData::Constr(x) = p {
91+
let mut f = x.fields.iter();
9392

94-
pos_x = integer_plutus_field(f.next()).unwrap();
95-
pos_y = integer_plutus_field(f.next()).unwrap();
96-
asset_name = hex::encode(string_plutus_field(f.next()).unwrap());
97-
}
98-
_ => {}
93+
pos_x = integer_plutus_field(f.next()).unwrap();
94+
pos_y = integer_plutus_field(f.next()).unwrap();
95+
asset_name = hex::encode(string_plutus_field(f.next()).unwrap());
9996
}
10097
}
10198

0 commit comments

Comments
 (0)