Skip to content

Commit 945b34e

Browse files
authored
Filter global traffic (#123)
We are soon going to allow public nodes to follow, however we do not want their network traffic to interfere with the voting. Public nodes should be able to consume and subscribe but not publish. PR allows internal nodes to filter traffic based on a whitelist i.e they will not accept fragments that are not in the whitelist. This allows us to configure and deploy multiple scenarios including a gateway model.
2 parents 8319500 + 80796f0 commit 945b34e

File tree

16 files changed

+505
-5
lines changed

16 files changed

+505
-5
lines changed

src/jormungandr/jormungandr-lib/src/interfaces/config/node.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,8 @@ pub struct P2p {
211211

212212
pub allow_private_addresses: bool,
213213

214+
pub whitelist: Option<Vec<SocketAddr>>,
215+
214216
pub policy: Option<Policy>,
215217

216218
#[serde(skip_serializing_if = "Option::is_none")]

src/jormungandr/jormungandr/src/fragment/process.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ impl Process {
114114
tip,
115115
stats_counter.clone()
116116
);
117-
118117
loop {
119118
tokio::select! {
120119
maybe_msg = input.next() => {

src/jormungandr/jormungandr/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
291291

292292
{
293293
let blockchain_tip = blockchain_tip.clone();
294+
294295
let process = fragment::Process::new(
295296
bootstrapped_node.settings.mempool.pool_max_entries.into(),
296297
bootstrapped_node.settings.mempool.log_max_entries.into(),

src/jormungandr/jormungandr/src/network/p2p/comm.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,11 @@ impl Peers {
666666
}
667667
}
668668

669+
pub async fn get_peer_addr(&self, peer: &NodeId) -> Option<SocketAddr> {
670+
let mut map = self.inner().await;
671+
map.peer_comms(peer).map(|peer| peer.remote_addr())
672+
}
673+
669674
pub async fn refresh_peer_on_gossip(&self, peer: &NodeId) -> bool {
670675
let timestamp = SystemTime::now();
671676
let mut map = self.inner().await;

src/jormungandr/jormungandr/src/network/p2p/comm/peer_map.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ impl PeerMap {
267267
.iter()
268268
.map(|(&id, data)| PeerInfo {
269269
id,
270-
addr: None,
270+
addr: Some(data.comms.remote_addr),
271271
stats: data.stats.clone(),
272272
})
273273
.collect()

src/jormungandr/jormungandr/src/network/subscription.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use chain_network::{
1010
data as net_data,
1111
error::{Code, Error},
1212
};
13+
use futures::executor;
1314
use futures::{future::BoxFuture, prelude::*, ready};
1415
use jormungandr_lib::interfaces::FragmentOrigin;
1516
use std::{
@@ -24,7 +25,7 @@ fn filter_gossip_node(node: &Gossip, config: &Configuration) -> bool {
2425
if config.allow_private_addresses {
2526
node.has_valid_address()
2627
} else {
27-
node.is_global()
28+
!node.is_global()
2829
}
2930
}
3031

@@ -165,6 +166,12 @@ impl FragmentProcessor {
165166
}
166167
}
167168

169+
fn get_ingress_addr(&self) -> Option<std::net::SocketAddr> {
170+
let state = self.global_state.clone();
171+
let node_id = self.node_id;
172+
executor::block_on(state.peers.get_peer_addr(&node_id))
173+
}
174+
168175
fn refresh_stat(&mut self) {
169176
let state = self.global_state.clone();
170177
let node_id = self.node_id;
@@ -301,7 +308,23 @@ impl Sink<net_data::Fragment> for FragmentProcessor {
301308
e
302309
})?;
303310
tracing::debug!(hash = %fragment.hash(), "received fragment");
304-
self.buffered_fragments.push(fragment);
311+
312+
if let Some(whitelist) = &self.global_state.config.whitelist {
313+
match self.get_ingress_addr() {
314+
Some(ingress_addr) => {
315+
if whitelist.contains(&ingress_addr) {
316+
self.buffered_fragments.push(fragment);
317+
} else {
318+
tracing::info!("dropping fragments from {}", ingress_addr);
319+
}
320+
}
321+
None => tracing::warn!("unable to resolve address of ingress client"),
322+
}
323+
} else {
324+
// if no whitelist config, normal behaviour, no filtering
325+
self.buffered_fragments.push(fragment);
326+
}
327+
305328
Ok(())
306329
}
307330

@@ -410,6 +433,7 @@ impl Sink<net_data::Gossip> for GossipProcessor {
410433
let state1 = self.global_state.clone();
411434
let mut mbox = self.mbox.clone();
412435
let node_id = self.node_id;
436+
413437
let fut = future::join(
414438
async move {
415439
let refreshed = state1.peers.refresh_peer_on_gossip(&node_id).await;

src/jormungandr/jormungandr/src/settings/start/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub use jormungandr_lib::interfaces::{Cors, JRpc, LayersConfig, Rest, Tls, Trust
1010
use jormungandr_lib::{interfaces::Mempool, time::Duration};
1111
use multiaddr::Multiaddr;
1212
use serde::{de::Error as _, Deserialize, Deserializer, Serialize, Serializer};
13-
use std::path::PathBuf;
13+
use std::{net::SocketAddr, path::PathBuf};
1414
use tracing::level_filters::LevelFilter;
1515

1616
#[derive(Debug, Deserialize)]
@@ -99,6 +99,8 @@ pub struct P2pConfig {
9999
#[serde(default)]
100100
pub allow_private_addresses: bool,
101101

102+
pub whitelist: Option<Vec<SocketAddr>>,
103+
102104
/// setting for the policy
103105
#[serde(default)]
104106
pub policy: QuarantineConfig,

src/jormungandr/jormungandr/src/settings/start/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ fn generate_network(
328328
.unwrap_or(network::DEFAULT_MAX_CLIENT_CONNECTIONS),
329329
timeout: std::time::Duration::from_secs(15),
330330
allow_private_addresses: p2p.allow_private_addresses,
331+
whitelist: p2p.whitelist,
331332
gossip_interval: p2p
332333
.gossip_interval
333334
.map(|d| d.into())

src/jormungandr/jormungandr/src/settings/start/network.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ pub struct Configuration {
8585
/// Whether to allow non-public IP addresses in gossip
8686
pub allow_private_addresses: bool,
8787

88+
pub whitelist: Option<Vec<SocketAddr>>,
89+
8890
pub gossip_interval: Duration,
8991

9092
pub network_stuck_check: Duration,

src/jormungandr/testing/hersir/src/config/spawn_params.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::{
1919
#[derive(Clone, Debug, Deserialize)]
2020
pub struct SpawnParams {
2121
alias: NodeAlias,
22+
2223
bootstrap_from_peers: Option<bool>,
2324
faketime: Option<FaketimeConfig>,
2425
gossip_interval: Option<Duration>,
@@ -28,6 +29,8 @@ pub struct SpawnParams {
2829
log_level: Option<LogLevel>,
2930
max_bootstrap_attempts: Option<usize>,
3031
max_connections: Option<u32>,
32+
allow_private_addresses: Option<bool>,
33+
whitelist: Option<Vec<SocketAddr>>,
3134
max_inbound_connections: Option<u32>,
3235
mempool: Option<Mempool>,
3336
network_stuck_check: Option<Duration>,
@@ -59,6 +62,8 @@ impl SpawnParams {
5962
log_level: None,
6063
max_bootstrap_attempts: None,
6164
max_connections: None,
65+
allow_private_addresses: None,
66+
whitelist: None,
6267
max_inbound_connections: None,
6368
mempool: None,
6469
network_stuck_check: None,
@@ -138,6 +143,16 @@ impl SpawnParams {
138143
self
139144
}
140145

146+
pub fn allow_private_addresses(mut self, switch: bool) -> Self {
147+
self.allow_private_addresses = Some(switch);
148+
self
149+
}
150+
151+
pub fn whitelist(mut self, nodes: Vec<SocketAddr>) -> Self {
152+
self.whitelist = Some(nodes);
153+
self
154+
}
155+
141156
pub fn max_inbound_connections(mut self, max_inbound_connections: u32) -> Self {
142157
self.max_inbound_connections = Some(max_inbound_connections);
143158
self
@@ -273,6 +288,14 @@ impl SpawnParams {
273288
node_config.p2p.max_inbound_connections = Some(*max_inbound_connections);
274289
}
275290

291+
if let Some(allow_private_addresses) = &self.allow_private_addresses {
292+
node_config.p2p.allow_private_addresses = *allow_private_addresses;
293+
}
294+
295+
if let Some(whitelist) = &self.whitelist {
296+
node_config.p2p.whitelist = Some(whitelist.clone());
297+
}
298+
276299
if let Some(max_connections) = &self.max_connections {
277300
node_config.p2p.max_connections = Some(*max_connections);
278301
}

0 commit comments

Comments
 (0)