Skip to content

Commit 4ab2285

Browse files
authored
Merge pull request #978 from openmina/feat/better_seed_node
Better seed node
2 parents 2454ad2 + 9885a7c commit 4ab2285

File tree

13 files changed

+127
-49
lines changed

13 files changed

+127
-49
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/src/commands/node/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ pub struct Node {
7575
#[arg(long, env)]
7676
pub peer_list_url: Option<Url>,
7777

78+
#[arg(long, default_value = "100")]
79+
pub max_peers: usize,
80+
7881
/// Run the node in seed mode. No default peers will be added.
7982
#[arg(long, env)]
8083
pub seed: bool,
@@ -206,6 +209,7 @@ impl Node {
206209
.filter_map(|s| s.parse().ok()),
207210
);
208211

212+
node_builder.p2p_max_peers(self.max_peers);
209213
self.seed.then(|| node_builder.p2p_seed_node());
210214
self.no_peers_discovery
211215
.then(|| node_builder.p2p_no_discovery());

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ tracing = { version = "0.1", features = ["std"] }
1414
sha2 = "0.10.6"
1515
binprot = { git = "https://github.com/openmina/binprot-rs", rev = "2b5a909" }
1616
binprot_derive = { git = "https://github.com/openmina/binprot-rs", rev = "2b5a909" }
17+
rand = "0.8.0"
1718
redux = { workspace = true }
1819
tokio = { version = "1.26", features = ["sync"] }
1920
time = { version = "0.3", features = ["formatting", "macros", "parsing"] }

core/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ mod work_dir {
5252

5353
pub use work_dir::{get_debug_dir, get_work_dir, set_work_dir};
5454

55+
use rand::prelude::*;
56+
#[inline(always)]
57+
pub fn pseudo_rng(time: redux::Timestamp) -> StdRng {
58+
StdRng::seed_from_u64(time.into())
59+
}
60+
5561
pub fn preshared_key(chain_id: &ChainId) -> [u8; 32] {
5662
use multihash::Hasher;
5763
let mut hasher = Blake2b256::default();

node/native/src/node/builder.rs

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{
44
net::IpAddr,
55
path::Path,
66
sync::Arc,
7+
time::Duration,
78
};
89

910
use anyhow::Context;
@@ -34,13 +35,10 @@ pub struct NodeBuilder {
3435
rng_seed: [u8; 32],
3536
custom_initial_time: Option<redux::Timestamp>,
3637
genesis_config: Arc<GenesisConfig>,
38+
p2p: P2pConfig,
3739
p2p_sec_key: Option<P2pSecretKey>,
38-
p2p_libp2p_port: Option<u16>,
3940
p2p_is_seed: bool,
40-
p2p_no_discovery: bool,
4141
p2p_is_started: bool,
42-
initial_peers: Vec<P2pConnectionOutgoingInitOpts>,
43-
external_addrs: Vec<IpAddr>,
4442
block_producer: Option<BlockProducerConfig>,
4543
snarker: Option<SnarkerConfig>,
4644
service: NodeServiceBuilder,
@@ -68,13 +66,25 @@ impl NodeBuilder {
6866
rng_seed,
6967
custom_initial_time: None,
7068
genesis_config,
69+
p2p: P2pConfig {
70+
libp2p_port: None,
71+
listen_port: None,
72+
// Must be replaced with builder api.
73+
identity_pub_key: P2pSecretKey::deterministic(0).public_key(),
74+
initial_peers: Vec::new(),
75+
external_addrs: Vec::new(),
76+
enabled_channels: ChannelId::iter_all().collect(),
77+
peer_discovery: true,
78+
meshsub: P2pMeshsubConfig {
79+
initial_time: Duration::ZERO,
80+
..Default::default()
81+
},
82+
timeouts: P2pTimeouts::default(),
83+
limits: P2pLimits::default().with_max_peers(Some(100)),
84+
},
7185
p2p_sec_key: None,
72-
p2p_libp2p_port: None,
7386
p2p_is_seed: false,
74-
p2p_no_discovery: false,
7587
p2p_is_started: false,
76-
initial_peers: Vec::new(),
77-
external_addrs: Vec::new(),
7888
block_producer: None,
7989
snarker: None,
8090
service: NodeServiceBuilder::new(rng_seed),
@@ -94,12 +104,13 @@ impl NodeBuilder {
94104

95105
/// If not called, random one will be generated and used instead.
96106
pub fn p2p_sec_key(&mut self, key: P2pSecretKey) -> &mut Self {
107+
self.p2p.identity_pub_key = key.public_key();
97108
self.p2p_sec_key = Some(key);
98109
self
99110
}
100111

101112
pub fn p2p_libp2p_port(&mut self, port: u16) -> &mut Self {
102-
self.p2p_libp2p_port = Some(port);
113+
self.p2p.libp2p_port = Some(port);
103114
self
104115
}
105116

@@ -110,7 +121,7 @@ impl NodeBuilder {
110121
}
111122

112123
pub fn p2p_no_discovery(&mut self) -> &mut Self {
113-
self.p2p_no_discovery = true;
124+
self.p2p.peer_discovery = false;
114125
self
115126
}
116127

@@ -119,19 +130,19 @@ impl NodeBuilder {
119130
&mut self,
120131
peers: impl IntoIterator<Item = P2pConnectionOutgoingInitOpts>,
121132
) -> &mut Self {
122-
self.initial_peers.extend(peers);
133+
self.p2p.initial_peers.extend(peers);
123134
self
124135
}
125136

126137
pub fn external_addrs(&mut self, v: impl Iterator<Item = IpAddr>) -> &mut Self {
127-
self.external_addrs.extend(v);
138+
self.p2p.external_addrs.extend(v);
128139
self
129140
}
130141

131142
/// Extend p2p initial peers from file.
132143
pub fn initial_peers_from_file(&mut self, path: impl AsRef<Path>) -> anyhow::Result<&mut Self> {
133144
peers_from_reader(
134-
&mut self.initial_peers,
145+
&mut self.p2p.initial_peers,
135146
File::open(&path).context(anyhow::anyhow!(
136147
"opening peer list file {:?}",
137148
path.as_ref()
@@ -152,14 +163,19 @@ impl NodeBuilder {
152163
) -> anyhow::Result<&mut Self> {
153164
let url = url.into_url().context("failed to parse peers url")?;
154165
peers_from_reader(
155-
&mut self.initial_peers,
166+
&mut self.p2p.initial_peers,
156167
reqwest::blocking::get(url.clone())
157168
.context(anyhow::anyhow!("reading peer list url {url}"))?,
158169
)
159170
.context(anyhow::anyhow!("reading peer list url {url}"))?;
160171
Ok(self)
161172
}
162173

174+
pub fn p2p_max_peers(&mut self, limit: usize) -> &mut Self {
175+
self.p2p.limits = self.p2p.limits.with_max_peers(Some(limit));
176+
self
177+
}
178+
163179
/// Override default p2p task spawner.
164180
pub fn p2p_custom_task_spawner(
165181
&mut self,
@@ -274,15 +290,15 @@ impl NodeBuilder {
274290
self
275291
}
276292

277-
pub fn build(self) -> anyhow::Result<Node> {
293+
pub fn build(mut self) -> anyhow::Result<Node> {
278294
let p2p_sec_key = self.p2p_sec_key.unwrap_or_else(P2pSecretKey::rand);
279-
let initial_peers = if self.initial_peers.is_empty() && !self.p2p_is_seed {
280-
default_peers()
281-
} else {
282-
self.initial_peers
283-
};
295+
if self.p2p.initial_peers.is_empty() && !self.p2p_is_seed {
296+
self.p2p.initial_peers = default_peers();
297+
}
284298

285-
let initial_peers = initial_peers
299+
self.p2p.initial_peers = self
300+
.p2p
301+
.initial_peers
286302
.into_iter()
287303
.filter_map(|opts| match opts {
288304
P2pConnectionOutgoingInitOpts::LibP2P(mut opts) => {
@@ -293,8 +309,6 @@ impl NodeBuilder {
293309
})
294310
.collect();
295311

296-
let external_addrs = self.external_addrs;
297-
298312
let srs = self.verifier_srs.unwrap_or_else(get_srs);
299313
let block_verifier_index = self
300314
.block_verifier_index
@@ -306,6 +320,9 @@ impl NodeBuilder {
306320
let initial_time = self
307321
.custom_initial_time
308322
.unwrap_or_else(redux::Timestamp::global_now);
323+
self.p2p.meshsub.initial_time = initial_time
324+
.checked_sub(redux::Timestamp::ZERO)
325+
.unwrap_or_default();
309326

310327
let protocol_constants = self.genesis_config.protocol_constants()?;
311328
let consensus_consts =
@@ -319,23 +336,7 @@ impl NodeBuilder {
319336
consensus_constants: consensus_consts.clone(),
320337
testing_run: false,
321338
},
322-
p2p: P2pConfig {
323-
libp2p_port: self.p2p_libp2p_port,
324-
listen_port: self.http_port,
325-
identity_pub_key: p2p_sec_key.public_key(),
326-
initial_peers,
327-
external_addrs,
328-
enabled_channels: ChannelId::iter_all().collect(),
329-
peer_discovery: !self.p2p_no_discovery,
330-
meshsub: P2pMeshsubConfig {
331-
initial_time: initial_time
332-
.checked_sub(redux::Timestamp::ZERO)
333-
.unwrap_or_default(),
334-
..Default::default()
335-
},
336-
timeouts: P2pTimeouts::default(),
337-
limits: P2pLimits::default().with_max_peers(Some(100)),
338-
},
339+
p2p: self.p2p,
339340
ledger: LedgerConfig {},
340341
snark: SnarkConfig {
341342
block_verifier_index,

node/src/action_kind.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ pub enum ActionKind {
343343
P2pDisconnectionFinish,
344344
P2pDisconnectionInit,
345345
P2pDisconnectionPeerClosed,
346+
P2pDisconnectionRandomTry,
346347
P2pDisconnectionEffectfulInit,
347348
P2pEffectfulInitialize,
348349
P2pIdentifyNewRequest,
@@ -705,7 +706,7 @@ pub enum ActionKind {
705706
}
706707

707708
impl ActionKind {
708-
pub const COUNT: u16 = 595;
709+
pub const COUNT: u16 = 596;
709710
}
710711

711712
impl std::fmt::Display for ActionKind {
@@ -1212,6 +1213,7 @@ impl ActionKindGet for P2pConnectionAction {
12121213
impl ActionKindGet for P2pDisconnectionAction {
12131214
fn kind(&self) -> ActionKind {
12141215
match self {
1216+
Self::RandomTry => ActionKind::P2pDisconnectionRandomTry,
12151217
Self::Init { .. } => ActionKind::P2pDisconnectionInit,
12161218
Self::PeerClosed { .. } => ActionKind::P2pDisconnectionPeerClosed,
12171219
Self::FailedCleanup { .. } => ActionKind::P2pDisconnectionFailedCleanup,

node/src/state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ impl State {
309309
}
310310

311311
pub fn pseudo_rng(&self) -> StdRng {
312-
StdRng::seed_from_u64(self.time().into())
312+
crate::core::pseudo_rng(self.time())
313313
}
314314

315315
/// Must be called in the global reducer as the last thing only once

p2p/src/disconnection/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use crate::{
1515

1616
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, thiserror::Error)]
1717
pub enum P2pDisconnectionReason {
18+
#[error("random disconnection in order to free up space so that more peers can connect")]
19+
FreeUpSpace,
1820
#[error("message is unexpected for channel {0}")]
1921
P2pChannelMsgUnexpected(ChannelId),
2022
#[error("failed to send message to channel: {0}")]

p2p/src/disconnection/p2p_disconnection_actions.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Duration;
2+
13
use openmina_core::ActionEvent;
24
use serde::{Deserialize, Serialize};
35

@@ -6,9 +8,12 @@ use crate::{P2pPeerStatus, P2pState, PeerId};
68

79
pub type P2pDisconnectionActionWithMetaRef<'a> = redux::ActionWithMeta<&'a P2pDisconnectionAction>;
810

11+
const RANDOM_DISCONNECTION_TRY_FREQUENCY: Duration = Duration::from_secs(10);
12+
913
#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
1014
#[action_event(level = debug)]
1115
pub enum P2pDisconnectionAction {
16+
RandomTry,
1217
/// Initialize disconnection.
1318
#[action_event(fields(display(peer_id), display(reason)), level = info)]
1419
Init {
@@ -17,17 +22,26 @@ pub enum P2pDisconnectionAction {
1722
},
1823
/// Peer disconnection.
1924
#[action_event(fields(display(peer_id)), level = info)]
20-
PeerClosed { peer_id: PeerId },
25+
PeerClosed {
26+
peer_id: PeerId,
27+
},
2128
#[action_event(fields(display(peer_id)), level = info)]
22-
FailedCleanup { peer_id: PeerId },
29+
FailedCleanup {
30+
peer_id: PeerId,
31+
},
2332
/// Finish disconnecting from a peer.
2433
#[action_event(fields(display(peer_id)), level = debug)]
25-
Finish { peer_id: PeerId },
34+
Finish {
35+
peer_id: PeerId,
36+
},
2637
}
2738

2839
impl redux::EnablingCondition<P2pState> for P2pDisconnectionAction {
29-
fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool {
40+
fn is_enabled(&self, state: &P2pState, time: redux::Timestamp) -> bool {
3041
match self {
42+
P2pDisconnectionAction::RandomTry => time
43+
.checked_sub(state.last_random_disconnection_try)
44+
.map_or(false, |dur| dur >= RANDOM_DISCONNECTION_TRY_FREQUENCY),
3145
P2pDisconnectionAction::Init { peer_id, .. }
3246
| P2pDisconnectionAction::PeerClosed { peer_id, .. }
3347
| P2pDisconnectionAction::Finish { peer_id } => {

p2p/src/disconnection/p2p_disconnection_reducer.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
1-
use openmina_core::{bug_condition, Substate};
1+
use std::time::Duration;
2+
3+
use openmina_core::{bug_condition, pseudo_rng, Substate};
4+
use rand::prelude::*;
25
use redux::ActionWithMeta;
36

47
use crate::{
58
disconnection_effectful::P2pDisconnectionEffectfulAction, P2pNetworkSchedulerAction,
69
P2pPeerAction, P2pPeerStatus, P2pState,
710
};
811

9-
use super::{P2pDisconnectedState, P2pDisconnectionAction};
12+
use super::{P2pDisconnectedState, P2pDisconnectionAction, P2pDisconnectionReason};
13+
14+
/// Do not disconnect peer for this duration just for freeing up peer space.
15+
const FORCE_PEER_STABLE_FOR: Duration = Duration::from_secs(90);
1016

1117
impl P2pDisconnectedState {
1218
pub fn reducer<Action, State>(
@@ -21,6 +27,30 @@ impl P2pDisconnectedState {
2127
let p2p_state = state_context.get_substate_mut()?;
2228

2329
match action {
30+
P2pDisconnectionAction::RandomTry => {
31+
p2p_state.last_random_disconnection_try = meta.time();
32+
if p2p_state.config.limits.max_stable_peers()
33+
>= p2p_state.ready_peers_iter().count()
34+
{
35+
return Ok(());
36+
}
37+
let mut rng = pseudo_rng(meta.time());
38+
39+
let peer_id = p2p_state
40+
.ready_peers_iter()
41+
.filter(|(_, s)| s.connected_for(meta.time()) > FORCE_PEER_STABLE_FOR)
42+
.map(|(id, _)| *id)
43+
.choose(&mut rng);
44+
45+
if let Some(peer_id) = peer_id {
46+
let dispatcher = state_context.into_dispatcher();
47+
dispatcher.push(P2pDisconnectionAction::Init {
48+
peer_id,
49+
reason: P2pDisconnectionReason::FreeUpSpace,
50+
});
51+
}
52+
Ok(())
53+
}
2454
P2pDisconnectionAction::Init { peer_id, reason } => {
2555
#[cfg(feature = "p2p-libp2p")]
2656
if p2p_state.is_libp2p_peer(&peer_id) {

0 commit comments

Comments
 (0)