Skip to content

Commit 234e089

Browse files
Merge pull request #41 from nxthdr/metrics
add prometheus bmp metrics
2 parents e9aea85 + 3aa0466 commit 234e089

File tree

12 files changed

+242
-114
lines changed

12 files changed

+242
-114
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ Conversely, Risotto can be configured to stream updates as is to the event pipel
2626

2727
## Quick Start
2828

29-
The easiest way to use risotto is using Docker.
29+
The easiest way to use Risotto with Docker.
3030

3131
* Create a `risotto.yml` configuration file
3232

integration/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ The `bmp.from_kafka` table is using the ClickHouse [Kafka engine](https://clickh
1616
* Start the environment
1717

1818
```sh
19-
docker compose up --build --force-recreate --renew-anon-volumes
19+
docker compose up -d --build --force-recreate --renew-anon-volumes
2020
```
2121

2222
* Check Risotto state:

integration/config/risotto/risotto.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ kafka:
1313
topic: risotto-updates
1414

1515
state:
16-
enable: false
16+
enable: true
1717
path: /app/state.json
1818
save_interval: 10

risotto-lib/src/lib.rs

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
pub mod processor;
22
pub mod state;
33
pub mod state_store;
4+
pub mod statistics;
45
pub mod update;
56

67
use bgpkit_parser::parser::bmp::messages::BmpMessageBody;
78
use bytes::Bytes;
89
use core::net::IpAddr;
10+
use statistics::AsyncStatistics;
911
use std::sync::mpsc::Sender;
1012
use tracing::{debug, error, info, trace};
1113

@@ -18,6 +20,7 @@ use crate::update::{new_metadata, Update};
1820

1921
pub async fn process_bmp_message<T: StateStore>(
2022
state: Option<AsyncState<T>>,
23+
statistics: AsyncStatistics,
2124
tx: Sender<Update>,
2225
router_addr: IpAddr,
2326
router_port: u16,
@@ -32,22 +35,27 @@ pub async fn process_bmp_message<T: StateStore>(
3235
}
3336
};
3437

38+
let mut statistics_lock = statistics.lock().await;
39+
3540
trace!("[{}]:{} - {:?}", router_addr, router_port, message);
41+
statistics_lock.rx_bmp_messages += 1;
3642

3743
// Extract header and peer information
3844
let metadata = new_metadata(router_addr, router_port, &message);
3945

4046
match message.message_body {
4147
BmpMessageBody::InitiationMessage(body) => {
48+
trace!("{:?}", body);
4249
let tlvs_info = body
4350
.tlvs
4451
.iter()
4552
.map(|tlv| tlv.info.clone())
4653
.collect::<Vec<_>>();
47-
info!(
54+
debug!(
4855
"[{}]:{} - InitiationMessage: {:?}",
4956
router_addr, router_port, tlvs_info
5057
);
58+
statistics_lock.rx_bmp_initiation += 1;
5159
// No-Op
5260
}
5361
BmpMessageBody::PeerUpNotification(body) => {
@@ -60,10 +68,11 @@ pub async fn process_bmp_message<T: StateStore>(
6068
return;
6169
}
6270
let metadata = metadata.unwrap();
63-
info!(
71+
debug!(
6472
"[{}]:{} - PeerUpNotification - {}",
6573
metadata.router_addr, metadata.router_port, metadata.peer_addr
6674
);
75+
statistics_lock.rx_bmp_peer_up += 1;
6776
peer_up_notification(state, tx, metadata, body).await;
6877
}
6978
BmpMessageBody::RouteMonitoring(body) => {
@@ -76,10 +85,13 @@ pub async fn process_bmp_message<T: StateStore>(
7685
return;
7786
}
7887
let metadata = metadata.unwrap();
88+
statistics_lock.rx_bmp_route_monitoring += 1;
7989
route_monitoring(state, tx, metadata, body).await;
8090
}
81-
BmpMessageBody::RouteMirroring(_) => {
82-
info!("[{}]:{} - RouteMirroring", router_addr, router_port)
91+
BmpMessageBody::RouteMirroring(body) => {
92+
trace!("{:?}", body);
93+
debug!("[{}]:{} - RouteMirroring", router_addr, router_port);
94+
statistics_lock.rx_bmp_route_mirroring += 1;
8395
// No-Op
8496
}
8597
BmpMessageBody::PeerDownNotification(body) => {
@@ -92,20 +104,25 @@ pub async fn process_bmp_message<T: StateStore>(
92104
return;
93105
}
94106
let metadata = metadata.unwrap();
95-
info!(
107+
debug!(
96108
"[{}]:{} - PeerDownNotification: - {}",
97109
metadata.router_addr, metadata.router_port, metadata.peer_addr
98110
);
111+
statistics_lock.rx_bmp_peer_down += 1;
99112
peer_down_notification(state, tx, metadata, body).await;
100113
}
101114

102-
BmpMessageBody::TerminationMessage(_) => {
103-
info!("[{}]:{} - TerminationMessage", router_addr, router_port)
115+
BmpMessageBody::TerminationMessage(body) => {
116+
trace!("{:?}", body);
117+
info!("[{}]:{} - TerminationMessage", router_addr, router_port);
118+
statistics_lock.rx_bmp_termination += 1;
104119
// No-Op
105120
}
106-
BmpMessageBody::StatsReport(_) => {
107-
info!("[{}]:{} - StatsReport", router_addr, router_port)
121+
BmpMessageBody::StatsReport(body) => {
122+
trace!("{:?}", body);
123+
info!("[{}]:{} - StatsReport", router_addr, router_port);
124+
statistics_lock.rx_bmp_stats_report += 1;
108125
// No-Op
109126
}
110-
}
127+
};
111128
}

risotto-lib/src/processor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub async fn route_monitoring<T: StateStore>(
5050

5151
let mut legitimate_updates = Vec::new();
5252
if let Some(state) = &state {
53-
let mut state_lock = state.lock().unwrap();
53+
let mut state_lock = state.lock().await;
5454
for update in potential_updates {
5555
let is_updated = state_lock
5656
.update(&metadata.router_addr.clone(), &metadata.peer_addr, &update)
@@ -80,7 +80,7 @@ pub async fn peer_down_notification<T: StateStore>(
8080
if let Some(state) = state {
8181
// Remove the peer and the associated updates from the state
8282
// We start by emiting synthetic withdraw updates
83-
let mut state_lock = state.lock().unwrap();
83+
let mut state_lock = state.lock().await;
8484

8585
let mut synthetic_updates = Vec::new();
8686
let updates = state_lock

risotto-lib/src/state.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use serde::{Deserialize, Serialize};
55
use std::error::Error;
66
use std::hash::{Hash, Hasher};
77
use std::sync::mpsc::Sender;
8-
use std::sync::{Arc, Mutex};
8+
use std::sync::Arc;
99
use std::time::Duration;
10+
use tokio::sync::Mutex;
1011
use tokio::time::sleep;
1112
use tracing::{info, trace};
1213

@@ -125,7 +126,7 @@ pub async fn peer_up_withdraws_handler<T: StateStore>(
125126
metadata.router_addr, metadata.router_port, metadata.peer_addr, startup, sleep_time
126127
);
127128

128-
let state_lock = state.lock().unwrap();
129+
let state_lock = state.lock().await;
129130
let timed_prefixes = state_lock
130131
.store
131132
.get_updates_by_peer(&metadata.router_addr, &metadata.peer_addr);
@@ -149,7 +150,7 @@ pub async fn peer_up_withdraws_handler<T: StateStore>(
149150
synthetic_updates.len()
150151
);
151152

152-
let mut state_lock = state.lock().unwrap();
153+
let mut state_lock = state.lock().await;
153154
for update in &mut synthetic_updates {
154155
trace!("{:?}", update);
155156

risotto-lib/src/statistics.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use std::sync::Arc;
2+
use tokio::sync::Mutex;
3+
4+
pub type AsyncStatistics = Arc<Mutex<ProcessorStatistics>>;
5+
6+
pub fn new_statistics() -> AsyncStatistics {
7+
Arc::new(Mutex::new(ProcessorStatistics::default()))
8+
}
9+
10+
pub struct ProcessorStatistics {
11+
pub rx_bmp_messages: usize,
12+
pub rx_bmp_initiation: usize,
13+
pub rx_bmp_peer_up: usize,
14+
pub rx_bmp_route_monitoring: usize,
15+
pub rx_bmp_route_mirroring: usize,
16+
pub rx_bmp_peer_down: usize,
17+
pub rx_bmp_termination: usize,
18+
pub rx_bmp_stats_report: usize,
19+
}
20+
21+
impl Default for ProcessorStatistics {
22+
fn default() -> Self {
23+
Self {
24+
rx_bmp_messages: 0,
25+
rx_bmp_initiation: 0,
26+
rx_bmp_peer_up: 0,
27+
rx_bmp_route_monitoring: 0,
28+
rx_bmp_route_mirroring: 0,
29+
rx_bmp_peer_down: 0,
30+
rx_bmp_termination: 0,
31+
rx_bmp_stats_report: 0,
32+
}
33+
}
34+
}

risotto/src/api.rs

Lines changed: 63 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
use axum::{extract::State as AxumState, routing::get, Json, Router};
22
use core::net::IpAddr;
3-
use metrics::{Key, Label, Recorder};
4-
use metrics_exporter_prometheus::PrometheusBuilder;
3+
use metrics::{counter, gauge, Label};
4+
use metrics_exporter_prometheus::PrometheusHandle;
55
use serde::{Deserialize, Serialize};
66

7-
use risotto_lib::{state::AsyncState, state_store::store::StateStore};
8-
9-
static METADATA: metrics::Metadata =
10-
metrics::Metadata::new(module_path!(), metrics::Level::INFO, Some(module_path!()));
7+
use risotto_lib::{state::AsyncState, state_store::store::StateStore, statistics::AsyncStatistics};
118

129
#[derive(Debug, Serialize, Deserialize)]
1310
struct APIRouter {
@@ -24,28 +21,38 @@ struct APIPeer {
2421

2522
struct AppState<T: StateStore> {
2623
state: Option<AsyncState<T>>,
24+
statistics: AsyncStatistics,
25+
prom_handle: PrometheusHandle,
2726
}
2827

2928
impl<T: StateStore> Clone for AppState<T> {
3029
fn clone(&self) -> Self {
3130
Self {
3231
state: self.state.clone(),
32+
statistics: self.statistics.clone(),
33+
prom_handle: self.prom_handle.clone(),
3334
}
3435
}
3536
}
3637

37-
pub fn app<T: StateStore>(state: Option<AsyncState<T>>) -> Router {
38-
let app_state = AppState { state };
39-
38+
pub fn app<T: StateStore>(
39+
state: Option<AsyncState<T>>,
40+
statistics: AsyncStatistics,
41+
prom_handle: PrometheusHandle,
42+
) -> Router {
43+
let app_state = AppState {
44+
state,
45+
statistics,
46+
prom_handle,
47+
};
4048
Router::new()
4149
.route("/", get(root).with_state(app_state.clone()))
4250
.route("/metrics", get(metrics).with_state(app_state.clone()))
4351
}
4452

4553
async fn format<T: StateStore>(state: AsyncState<T>) -> Vec<APIRouter> {
4654
let mut api_routers: Vec<APIRouter> = Vec::new();
47-
let state = state.lock().unwrap();
48-
55+
let state = state.lock().await;
4956
for (router_addr, peer_addr, update_prefix) in state.get_all().unwrap() {
5057
// Find the router in the list of routers
5158
let mut router = None;
@@ -102,7 +109,7 @@ async fn format<T: StateStore>(state: AsyncState<T>) -> Vec<APIRouter> {
102109
}
103110

104111
async fn root<T: StateStore>(
105-
AxumState(AppState { state }): AxumState<AppState<T>>,
112+
AxumState(AppState { state, .. }): AxumState<AppState<T>>,
106113
) -> Json<Vec<APIRouter>> {
107114
match state.as_ref() {
108115
Some(state) => {
@@ -113,45 +120,52 @@ async fn root<T: StateStore>(
113120
}
114121
}
115122

116-
async fn metrics<T: StateStore>(AxumState(AppState { state }): AxumState<AppState<T>>) -> String {
117-
if state.is_none() {
118-
return String::new();
119-
}
120-
121-
let state = state.unwrap();
122-
123-
let recorder = PrometheusBuilder::new().build_recorder();
124-
let api_routers = format(state).await;
125-
126-
recorder.describe_gauge(
127-
"risotto_bgp_peers".into(),
128-
None,
129-
"Number of BGP peers per router".into(),
130-
);
131-
for api_router in &api_routers {
132-
let labels = vec![Label::new("router", api_router.router_addr.to_string())];
133-
let key = Key::from_parts("risotto_bgp_peers", labels);
134-
recorder
135-
.register_gauge(&key, &METADATA)
136-
.set(api_router.peers.len() as f64);
137-
}
123+
async fn metrics<T: StateStore>(
124+
AxumState(AppState {
125+
state,
126+
statistics,
127+
prom_handle,
128+
}): AxumState<AppState<T>>,
129+
) -> String {
130+
// Set the state metrics if enabled
131+
if !state.is_none() {
132+
let state = state.unwrap();
133+
let api_routers = format(state).await;
134+
135+
for api_router in &api_routers {
136+
let labels = vec![Label::new("router", api_router.router_addr.to_string())];
137+
gauge!("risotto_state_bgp_peers", labels).set(api_router.peers.len() as f64);
138+
}
138139

139-
recorder.describe_gauge(
140-
"risotto_bgp_updates".into(),
141-
None,
142-
"Number of BGP updates per (router, peer)".into(),
143-
);
144-
for api_router in &api_routers {
145-
for api_peer in &api_router.peers {
146-
let total = api_peer.ipv4 + api_peer.ipv6;
147-
let labels = vec![
148-
Label::new("router", api_router.router_addr.to_string()),
149-
Label::new("peer", api_peer.peer_addr.to_string()),
150-
];
151-
let key = Key::from_parts("risotto_bgp_updates", labels);
152-
recorder.register_gauge(&key, &METADATA).set(total as f64);
140+
for api_router in &api_routers {
141+
for api_peer in &api_router.peers {
142+
let total = api_peer.ipv4 + api_peer.ipv6;
143+
let labels = vec![
144+
Label::new("router", api_router.router_addr.to_string()),
145+
Label::new("peer", api_peer.peer_addr.to_string()),
146+
];
147+
gauge!("risotto_state_bgp_updates", labels).set(total as f64);
148+
}
153149
}
154150
}
155151

156-
recorder.handle().render()
152+
// Set the statistics metrics
153+
let statistics = statistics.lock().await;
154+
let metric_name = "risotto_bmp_messages_total";
155+
counter!(metric_name, vec![Label::new("type", "initiation")])
156+
.absolute(statistics.rx_bmp_initiation as u64);
157+
counter!(metric_name, vec![Label::new("type", "peer_up")])
158+
.absolute(statistics.rx_bmp_peer_up as u64);
159+
counter!(metric_name, vec![Label::new("type", "route_monitoring")])
160+
.absolute(statistics.rx_bmp_route_monitoring as u64);
161+
counter!(metric_name, vec![Label::new("type", "route_mirroring")])
162+
.absolute(statistics.rx_bmp_route_mirroring as u64);
163+
counter!(metric_name, vec![Label::new("type", "peer_down")])
164+
.absolute(statistics.rx_bmp_peer_down as u64);
165+
counter!(metric_name, vec![Label::new("type", "termination")])
166+
.absolute(statistics.rx_bmp_termination as u64);
167+
counter!(metric_name, vec![Label::new("type", "stats_report")])
168+
.absolute(statistics.rx_bmp_stats_report as u64);
169+
170+
prom_handle.render()
157171
}

0 commit comments

Comments
 (0)