Skip to content

Commit df82156

Browse files
Merge pull request #29 from nxthdr/add-tests
adding test of processors function
2 parents 7d3db35 + 865871d commit df82156

File tree

8 files changed

+242
-94
lines changed

8 files changed

+242
-94
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44

55
# Risotto
66

7-
Risotto 😋 is a BGP updates collector that gathers BGP updates from routers via the BMP protocol. This repository contains the Risotto collector application and the Risotto library.
7+
Risotto 😋 is a BGP collector that processes BMP protocol messages from routers and publishes updates to Kafka/Redpanda. This repository includes both the Risotto collector application and the Risotto library.
88

9-
The application streams the BGP updates to a Kafka topic, which can be consumed by other components downstream. The library provides all the necessary components to decode BMP messages and produce BGP updates.
9+
The collector application streams BGP updates to a Kafka topic, enabling downstream components to consume them. The library offers essential components for decoding BMP messages and generating BGP updates.
1010

1111
## State Management
1212

@@ -17,7 +17,7 @@ This state addresses two challenges when handling BMP data:
1717

1818
Duplicate announcements could, in theory, be handled by the database, but less data manipulation is better. Instead, Risotto checks each incoming update against its state. If the prefix is already present, the update is discarded.
1919

20-
For Peer Down notifications, Risotto leverages its state to generate synthetic withdraws for the prefixes announced by the downed peer.
20+
Missing withdraws are generated synthetically when receiving Peer Down notifications for the prefixes already announced by the downed peer.
2121

2222
For persistance, Risotto dumps its state at specified interval, and fetches it at startup. Risotto is able to infer any missing withdraws that would have occured during downtime, from the initial peer up flow. This ensures the database remains accurate, even if the collector is restarted. On the other hand, a restart may result in duplicate announcements.
2323
In other words, Risotto guaranties that the database is always in a consistent state, but may contain some duplicate announcements.

risotto-lib/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ bgpkit-parser = { version = "0.11.0", features = ["serde"] }
1818
bytes = "1.9.0"
1919
rand = "0.9.0"
2020
serde = { version = "1.0.217", features = ["derive"] }
21-
tokio = "1.42.0"
21+
tokio = { version = "1.42.0", features = ["macros"] }
2222
tracing = "0.1.41"

risotto-lib/src/lib.rs

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,90 @@
1-
pub mod handler;
1+
pub mod processor;
22
pub mod state;
33
pub mod update;
4+
5+
use bgpkit_parser::parser::bmp::messages::BmpMessageBody;
6+
use bytes::Bytes;
7+
use core::net::IpAddr;
8+
use std::sync::mpsc::Sender;
9+
use tracing::{error, info, trace};
10+
11+
use crate::processor::{
12+
decode_bmp_message, peer_down_notification, peer_up_notification, route_monitoring,
13+
};
14+
use crate::state::AsyncState;
15+
use crate::update::{new_metadata, Update};
16+
17+
pub async fn process_bmp_message(
18+
state: Option<AsyncState>,
19+
tx: Sender<Update>,
20+
router_addr: IpAddr,
21+
router_port: u16,
22+
bytes: &mut Bytes,
23+
) {
24+
// Parse the BMP message
25+
let message = match decode_bmp_message(bytes) {
26+
Ok(message) => message,
27+
Err(e) => {
28+
error!("failed to decode BMP message: {}", e);
29+
return;
30+
}
31+
};
32+
33+
// Extract header and peer information
34+
let metadata = match new_metadata(router_addr, router_port, message.clone()) {
35+
Some(m) => m,
36+
None => return,
37+
};
38+
39+
match message.message_body {
40+
BmpMessageBody::InitiationMessage(_) => {
41+
info!(
42+
"InitiationMessage: {} - {}",
43+
metadata.router_addr, metadata.peer_addr
44+
)
45+
// No-Op
46+
}
47+
BmpMessageBody::PeerUpNotification(body) => {
48+
trace!("{:?}", body);
49+
info!(
50+
"PeerUpNotification: {} - {}",
51+
metadata.router_addr, metadata.peer_addr
52+
);
53+
peer_up_notification(state, tx, metadata, body).await;
54+
}
55+
BmpMessageBody::RouteMonitoring(body) => {
56+
trace!("{:?}", body);
57+
route_monitoring(state, tx, metadata, body).await;
58+
}
59+
BmpMessageBody::RouteMirroring(_) => {
60+
info!(
61+
"RouteMirroring: {} - {}",
62+
metadata.router_addr, metadata.peer_addr
63+
)
64+
// No-Op
65+
}
66+
BmpMessageBody::PeerDownNotification(body) => {
67+
trace!("{:?}", body);
68+
info!(
69+
"PeerDownNotification: {} - {}",
70+
metadata.router_addr, metadata.peer_addr
71+
);
72+
peer_down_notification(state, tx, metadata, body).await;
73+
}
74+
75+
BmpMessageBody::TerminationMessage(_) => {
76+
info!(
77+
"TerminationMessage: {} - {}",
78+
metadata.router_addr, metadata.peer_addr
79+
)
80+
// No-Op
81+
}
82+
BmpMessageBody::StatsReport(_) => {
83+
info!(
84+
"StatsReport: {} - {}",
85+
metadata.router_addr, metadata.peer_addr
86+
)
87+
// No-Op
88+
}
89+
}
90+
}
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
use anyhow::Result;
22
use bgpkit_parser::bmp::messages::{PeerDownNotification, PeerUpNotification, RouteMonitoring};
33
use bgpkit_parser::parse_bmp_msg;
4-
use bgpkit_parser::parser::bmp::messages::{BmpMessage, BmpMessageBody};
4+
use bgpkit_parser::parser::bmp::messages::BmpMessage;
55
use bytes::Bytes;
6-
use core::net::IpAddr;
76
use std::sync::mpsc::Sender;
8-
use tracing::{error, info, trace};
7+
use tracing::trace;
98

109
use crate::state::AsyncState;
1110
use crate::state::{peer_up_withdraws_handler, synthesize_withdraw_update};
12-
use crate::update::{decode_updates, new_metadata, new_peer_from_metadata, Update, UpdateMetadata};
11+
use crate::update::{decode_updates, new_peer_from_metadata, Update, UpdateMetadata};
1312

1413
pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
1514
let message = match parse_bmp_msg(bytes) {
@@ -20,7 +19,7 @@ pub fn decode_bmp_message(bytes: &mut Bytes) -> Result<BmpMessage> {
2019
Ok(message)
2120
}
2221

23-
pub async fn process_peer_up_notification(
22+
pub async fn peer_up_notification(
2423
state: Option<AsyncState>,
2524
tx: Sender<Update>,
2625
metadata: UpdateMetadata,
@@ -34,7 +33,7 @@ pub async fn process_peer_up_notification(
3433
}
3534
}
3635

37-
pub async fn process_route_monitoring(
36+
pub async fn route_monitoring(
3837
state: Option<AsyncState>,
3938
tx: Sender<Update>,
4039
metadata: UpdateMetadata,
@@ -66,7 +65,7 @@ pub async fn process_route_monitoring(
6665
}
6766
}
6867

69-
pub async fn process_peer_down_notification(
68+
pub async fn peer_down_notification(
7069
state: Option<AsyncState>,
7170
tx: Sender<Update>,
7271
metadata: UpdateMetadata,
@@ -100,78 +99,3 @@ pub async fn process_peer_down_notification(
10099
}
101100
}
102101
}
103-
104-
pub async fn process_bmp_message(
105-
state: Option<AsyncState>,
106-
tx: Sender<Update>,
107-
router_addr: IpAddr,
108-
router_port: u16,
109-
bytes: &mut Bytes,
110-
) {
111-
// Parse the BMP message
112-
let message = match decode_bmp_message(bytes) {
113-
Ok(message) => message,
114-
Err(e) => {
115-
error!("failed to decode BMP message: {}", e);
116-
return;
117-
}
118-
};
119-
120-
// Extract header and peer information
121-
let metadata = match new_metadata(router_addr, router_port, message.clone()) {
122-
Some(m) => m,
123-
None => return,
124-
};
125-
126-
match message.message_body {
127-
BmpMessageBody::InitiationMessage(_) => {
128-
info!(
129-
"InitiationMessage: {} - {}",
130-
metadata.router_addr, metadata.peer_addr
131-
)
132-
// No-Op
133-
}
134-
BmpMessageBody::PeerUpNotification(body) => {
135-
trace!("{:?}", body);
136-
info!(
137-
"PeerUpNotification: {} - {}",
138-
metadata.router_addr, metadata.peer_addr
139-
);
140-
process_peer_up_notification(state, tx, metadata, body).await;
141-
}
142-
BmpMessageBody::RouteMonitoring(body) => {
143-
trace!("{:?}", body);
144-
process_route_monitoring(state, tx, metadata, body).await;
145-
}
146-
BmpMessageBody::RouteMirroring(_) => {
147-
info!(
148-
"RouteMirroring: {} - {}",
149-
metadata.router_addr, metadata.peer_addr
150-
)
151-
// No-Op
152-
}
153-
BmpMessageBody::PeerDownNotification(body) => {
154-
trace!("{:?}", body);
155-
info!(
156-
"PeerDownNotification: {} - {}",
157-
metadata.router_addr, metadata.peer_addr
158-
);
159-
process_peer_down_notification(state, tx, metadata, body).await;
160-
}
161-
162-
BmpMessageBody::TerminationMessage(_) => {
163-
info!(
164-
"TerminationMessage: {} - {}",
165-
metadata.router_addr, metadata.peer_addr
166-
)
167-
// No-Op
168-
}
169-
BmpMessageBody::StatsReport(_) => {
170-
info!(
171-
"StatsReport: {} - {}",
172-
metadata.router_addr, metadata.peer_addr
173-
)
174-
// No-Op
175-
}
176-
}
177-
}

risotto-lib/src/state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use bgpkit_parser::models::{Origin, Peer as BGPkitPeer};
1+
use bgpkit_parser::models::Peer as BGPkitPeer;
22
use chrono::Utc;
33
use core::net::IpAddr;
44
use rand::Rng;
@@ -223,7 +223,7 @@ pub fn synthesize_withdraw_update(prefix: TimedPrefix, metadata: UpdateMetadata)
223223
prefix_addr: prefix.prefix_addr,
224224
prefix_len: prefix.prefix_len,
225225
announced: false,
226-
origin: Origin::INCOMPLETE,
226+
origin: "INCOMPLETE".to_string(),
227227
path: vec![],
228228
communities: vec![],
229229
is_post_policy: prefix.is_post_policy,

risotto-lib/src/update.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub struct Update {
3030
pub announced: bool,
3131
pub is_post_policy: bool,
3232
pub is_adj_rib_out: bool,
33-
pub origin: Origin,
33+
pub origin: String,
3434
pub path: Vec<u32>,
3535
pub communities: Vec<(u32, u16)>,
3636
pub synthetic: bool,
@@ -95,7 +95,7 @@ pub fn decode_updates(message: RouteMonitoring, metadata: UpdateMetadata) -> Opt
9595
announced,
9696
is_post_policy: metadata.is_post_policy,
9797
is_adj_rib_out: metadata.is_adj_rib_out,
98-
origin,
98+
origin: origin.to_string(),
9999
path: new_path(path.clone()),
100100
communities: new_communities(&communities.clone()),
101101
synthetic: false,

0 commit comments

Comments
 (0)