Skip to content

Commit 8c50107

Browse files
committed
fixes
1 parent 96c5ba3 commit 8c50107

File tree

9 files changed

+189
-51
lines changed

9 files changed

+189
-51
lines changed

modules/peer_network_interface/config.default.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,7 @@ peer-sharing-enabled = true
3838
churn-interval-secs = 600
3939
# Timeout in seconds for a full peer-sharing exchange
4040
peer-sharing-timeout-secs = 10
41+
# Timeout in seconds for TCP connect to a peer
42+
connect-timeout-secs = 15
43+
# Accept IPv6 peer addresses from peer-sharing (set true on dual-stack hosts)
44+
ipv6-enabled = false

modules/peer_network_interface/src/block_flow_consensus_scenarios_tests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ async fn make_harness() -> TestHarness {
113113
peer_sharing_enabled: false,
114114
churn_interval_secs: 600,
115115
peer_sharing_timeout_secs: 10,
116+
connect_timeout_secs: 15,
117+
ipv6_enabled: false,
116118
};
117119

118120
let flow = BlockFlowHandler::new(

modules/peer_network_interface/src/configuration.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ pub struct InterfaceConfig {
3838
pub churn_interval_secs: u64,
3939
#[serde(default = "default_peer_sharing_timeout_secs")]
4040
pub peer_sharing_timeout_secs: u64,
41+
#[serde(default = "default_connect_timeout_secs")]
42+
pub connect_timeout_secs: u64,
43+
#[serde(default = "default_ipv6_enabled")]
44+
pub ipv6_enabled: bool,
4145
}
4246

4347
fn default_consensus_topic() -> String {
@@ -68,6 +72,14 @@ fn default_peer_sharing_timeout_secs() -> u64 {
6872
10
6973
}
7074

75+
fn default_connect_timeout_secs() -> u64 {
76+
15
77+
}
78+
79+
fn default_ipv6_enabled() -> bool {
80+
false
81+
}
82+
7183
impl InterfaceConfig {
7284
pub fn try_load(config: &Config) -> Result<Self> {
7385
let full_config = Config::builder()

modules/peer_network_interface/src/connection.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,18 @@ pub struct PeerConnection {
2525
}
2626

2727
impl PeerConnection {
28-
pub fn new(address: String, magic: u32, sender: PeerMessageSender, delay: Duration) -> Self {
28+
pub fn new(
29+
address: String,
30+
magic: u32,
31+
sender: PeerMessageSender,
32+
delay: Duration,
33+
connect_timeout: Duration,
34+
) -> Self {
2935
let worker = PeerConnectionWorker {
3036
address: address.clone(),
3137
magic,
3238
sender,
39+
connect_timeout,
3340
};
3441
let (chainsync_tx, chainsync_rx) = mpsc::unbounded_channel();
3542
let (blockfetch_tx, blockfetch_rx) = mpsc::unbounded_channel();
@@ -100,6 +107,7 @@ struct PeerConnectionWorker {
100107
address: String,
101108
magic: u32,
102109
sender: PeerMessageSender,
110+
connect_timeout: Duration,
103111
}
104112

105113
impl PeerConnectionWorker {
@@ -119,7 +127,13 @@ impl PeerConnectionWorker {
119127
chainsync: mpsc::UnboundedReceiver<ChainsyncCommand>,
120128
blockfetch: mpsc::UnboundedReceiver<BlockfetchCommand>,
121129
) -> Result<()> {
122-
let client = PeerClient::connect(self.address.clone(), self.magic.into()).await?;
130+
let timeout_dur = self.connect_timeout;
131+
let client = tokio::time::timeout(
132+
timeout_dur,
133+
PeerClient::connect(self.address.clone(), self.magic.into()),
134+
)
135+
.await
136+
.map_err(|_| anyhow::anyhow!("connect timeout after {}s", timeout_dur.as_secs()))??;
123137
select! {
124138
res = self.run_chainsync(client.chainsync, chainsync) => res,
125139
res = self.run_blockfetch(client.blockfetch, blockfetch) => res,

modules/peer_network_interface/src/network.rs

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ pub struct NetworkManager {
9494
/// Addresses from the static `node_addresses` config. Configured peers are always
9595
/// retried on disconnect and are never blacklisted via `mark_failed`.
9696
configured_addrs: HashSet<String>,
97+
connect_timeout: Duration,
98+
ipv6_enabled: bool,
9799
}
98100

99101
impl NetworkManager {
@@ -110,6 +112,8 @@ impl NetworkManager {
110112
peer_sharing_enabled: bool,
111113
churn_interval_secs: u64,
112114
peer_sharing_timeout_secs: u64,
115+
connect_timeout_secs: u64,
116+
ipv6_enabled: bool,
113117
) -> Self {
114118
let peer_manager = if peer_sharing_enabled {
115119
Some(PeerManager::new(PeerManagerConfig {
@@ -140,6 +144,8 @@ impl NetworkManager {
140144
min_hot_peers,
141145
cold_origin: HashSet::new(),
142146
configured_addrs,
147+
connect_timeout: Duration::from_secs(connect_timeout_secs),
148+
ipv6_enabled,
143149
};
144150

145151
if peer_sharing_enabled {
@@ -217,16 +223,17 @@ impl NetworkManager {
217223
let hot: HashSet<String> =
218224
self.peers.values().map(|p| p.conn.address.clone()).collect();
219225
if let Some(ref mut pm) = self.peer_manager {
220-
let count = addresses.len();
226+
let received = addresses.len();
221227
let queried_peer = self
222228
.peers
223229
.get(&from_peer)
224230
.map(|p| p.conn.address.as_str())
225231
.unwrap_or("unknown");
226-
pm.add_discovered(addresses, &hot);
232+
let added = pm.add_discovered(addresses, &hot);
227233
info!(
228234
queried_peer,
229-
discovered = count,
235+
received,
236+
added,
230237
cold_count = pm.cold_count(),
231238
"peer-sharing discovery batch complete"
232239
);
@@ -329,6 +336,7 @@ impl NetworkManager {
329336
let amount = pm.config().target_peer_count.min(255) as u8;
330337
let timeout = Duration::from_secs(pm.config().peer_sharing_timeout_secs);
331338
let sender = self.events_sender.clone();
339+
let ipv6 = self.ipv6_enabled;
332340

333341
info!(
334342
peer = %address,
@@ -339,7 +347,7 @@ impl NetworkManager {
339347
);
340348

341349
tokio::spawn(async move {
342-
match request_peers(&address, magic, amount, timeout).await {
350+
match request_peers(&address, magic, amount, timeout, ipv6).await {
343351
Ok(addrs) => {
344352
info!(
345353
peer = %address,
@@ -448,7 +456,13 @@ impl NetworkManager {
448456
sink: self.events_sender.clone(),
449457
id,
450458
};
451-
let conn = PeerConnection::new(address, self.network_magic, sender, delay);
459+
let conn = PeerConnection::new(
460+
address,
461+
self.network_magic,
462+
sender,
463+
delay,
464+
self.connect_timeout,
465+
);
452466
let peer = PeerData::new(conn);
453467
let points = self.flow_handler.handle_new_connection(id, self.sync_point.as_ref());
454468
peer.find_intersect(points);
@@ -545,7 +559,10 @@ impl NetworkManager {
545559
// Ghost disconnect: peer was already removed (e.g. churn dropped the PeerData,
546560
// which caused the worker to exit and emit a Disconnected event). Safe to
547561
// ignore — flow_handler was already called in on_churn.
548-
debug!(peer_id = id.0, "ignoring ghost disconnect for already-removed peer");
562+
debug!(
563+
peer_id = id.0,
564+
"ignoring ghost disconnect for already-removed peer"
565+
);
549566
return;
550567
};
551568
warn!(address = %peer.conn.address, "disconnected from peer");
@@ -601,7 +618,8 @@ impl NetworkManager {
601618
let needs_promotion = self.peers.len() < self.min_hot_peers;
602619
let promoted = needs_promotion && self.try_promote_cold_peer();
603620
if promoted {
604-
let hot: HashSet<String> = self.peers.values().map(|p| p.conn.address.clone()).collect();
621+
let hot: HashSet<String> =
622+
self.peers.values().map(|p| p.conn.address.clone()).collect();
605623
if let Some(ref mut pm) = self.peer_manager {
606624
pm.demote_to_cold(address, &hot);
607625
}
@@ -746,6 +764,8 @@ mod tests {
746764
peer_sharing_enabled: false,
747765
churn_interval_secs: 600,
748766
peer_sharing_timeout_secs: 10,
767+
connect_timeout_secs: 15,
768+
ipv6_enabled: false,
749769
};
750770

751771
let flow_handler = BlockFlowHandler::new(
@@ -769,6 +789,8 @@ mod tests {
769789
false,
770790
600,
771791
10,
792+
15,
793+
false,
772794
)
773795
}
774796

@@ -783,6 +805,7 @@ mod tests {
783805
0,
784806
sender,
785807
Duration::from_secs(3600),
808+
Duration::from_secs(15),
786809
);
787810
manager.peers.insert(peer, PeerData::new(conn));
788811
}
@@ -823,6 +846,8 @@ mod tests {
823846
peer_sharing_enabled: true,
824847
churn_interval_secs: 600,
825848
peer_sharing_timeout_secs: 10,
849+
connect_timeout_secs: 15,
850+
ipv6_enabled: false,
826851
};
827852

828853
let flow_handler = BlockFlowHandler::new(
@@ -846,6 +871,8 @@ mod tests {
846871
cfg.peer_sharing_enabled,
847872
cfg.churn_interval_secs,
848873
cfg.peer_sharing_timeout_secs,
874+
cfg.connect_timeout_secs,
875+
cfg.ipv6_enabled,
849876
);
850877

851878
// Seed a cold peer manually
@@ -874,15 +901,25 @@ mod tests {
874901
pm.contains_cold("hot.peer.example.com:3001"),
875902
"disconnected hot peer should be returned to cold to stay in rotation"
876903
);
877-
assert_eq!(pm.cold_count(), 1, "net cold count: promoted one, returned one");
904+
assert_eq!(
905+
pm.cold_count(),
906+
1,
907+
"net cold count: promoted one, returned one"
908+
);
878909
}
879910

880911
fn add_test_peer_with_address(manager: &mut NetworkManager, peer: PeerId, address: &str) {
881912
let sender = PeerMessageSender {
882913
sink: manager.events_sender.clone(),
883914
id: peer,
884915
};
885-
let conn = PeerConnection::new(address.to_string(), 0, sender, Duration::from_secs(3600));
916+
let conn = PeerConnection::new(
917+
address.to_string(),
918+
0,
919+
sender,
920+
Duration::from_secs(3600),
921+
Duration::from_secs(15),
922+
);
886923
manager.peers.insert(peer, PeerData::new(conn));
887924
}
888925

@@ -908,6 +945,8 @@ mod tests {
908945
peer_sharing_enabled: false, // disabled
909946
churn_interval_secs: 600,
910947
peer_sharing_timeout_secs: 10,
948+
connect_timeout_secs: 15,
949+
ipv6_enabled: false,
911950
};
912951

913952
let flow_handler = BlockFlowHandler::new(
@@ -931,6 +970,8 @@ mod tests {
931970
cfg.peer_sharing_enabled,
932971
cfg.churn_interval_secs,
933972
cfg.peer_sharing_timeout_secs,
973+
cfg.connect_timeout_secs,
974+
cfg.ipv6_enabled,
934975
);
935976

936977
assert!(
@@ -966,6 +1007,8 @@ mod tests {
9661007
peer_sharing_enabled: true,
9671008
churn_interval_secs: 600,
9681009
peer_sharing_timeout_secs: 10,
1010+
connect_timeout_secs: 15,
1011+
ipv6_enabled: false,
9691012
};
9701013

9711014
let flow_handler = BlockFlowHandler::new(
@@ -989,6 +1032,8 @@ mod tests {
9891032
cfg.peer_sharing_enabled,
9901033
cfg.churn_interval_secs,
9911034
cfg.peer_sharing_timeout_secs,
1035+
cfg.connect_timeout_secs,
1036+
cfg.ipv6_enabled,
9921037
);
9931038

9941039
let addresses = vec![
@@ -1033,6 +1078,8 @@ mod tests {
10331078
peer_sharing_enabled: true,
10341079
churn_interval_secs: 600,
10351080
peer_sharing_timeout_secs: 10,
1081+
connect_timeout_secs: 15,
1082+
ipv6_enabled: false,
10361083
};
10371084

10381085
let flow_handler = BlockFlowHandler::new(
@@ -1056,6 +1103,8 @@ mod tests {
10561103
cfg.peer_sharing_enabled,
10571104
cfg.churn_interval_secs,
10581105
cfg.peer_sharing_timeout_secs,
1106+
cfg.connect_timeout_secs,
1107+
cfg.ipv6_enabled,
10591108
);
10601109

10611110
// Add 4 hot peers
@@ -1088,6 +1137,8 @@ mod tests {
10881137
peer_sharing_enabled: true,
10891138
churn_interval_secs: 600,
10901139
peer_sharing_timeout_secs: 10,
1140+
connect_timeout_secs: 15,
1141+
ipv6_enabled: false,
10911142
};
10921143

10931144
let flow_handler = BlockFlowHandler::new(
@@ -1111,6 +1162,8 @@ mod tests {
11111162
cfg.peer_sharing_enabled,
11121163
cfg.churn_interval_secs,
11131164
cfg.peer_sharing_timeout_secs,
1165+
cfg.connect_timeout_secs,
1166+
cfg.ipv6_enabled,
11141167
);
11151168

11161169
// Add exactly min_hot_peers = 3 peers

modules/peer_network_interface/src/peer_manager.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,17 +105,21 @@ impl PeerManager {
105105
/// # TODO(ledger-peers): `seed_from_ledger(addresses, hot)` follows the same pattern —
106106
/// add a sibling method that accepts relay addresses from `SPOStateMessage` and applies
107107
/// the same deduplication and cap enforcement.
108-
pub fn add_discovered(&mut self, addresses: Vec<String>, hot: &HashSet<String>) {
108+
pub fn add_discovered(&mut self, addresses: Vec<String>, hot: &HashSet<String>) -> usize {
109109
let cold_cap = self.config.target_peer_count * 4;
110+
let mut added = 0usize;
110111
for addr in addresses {
111112
if self.is_known(&addr, hot) {
112113
continue;
113114
}
114115
if self.cold_peers.len() >= cold_cap {
115116
self.evict_random_cold();
116117
}
117-
self.cold_peers.insert(addr);
118+
if self.cold_peers.insert(addr) {
119+
added += 1;
120+
}
118121
}
122+
added
119123
}
120124

121125
/// Remove and return a randomly selected cold peer address for promotion.

modules/peer_network_interface/src/peer_network_interface.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ impl PeerNetworkInterface {
155155
cfg.peer_sharing_enabled,
156156
cfg.churn_interval_secs,
157157
cfg.peer_sharing_timeout_secs,
158+
cfg.connect_timeout_secs,
159+
cfg.ipv6_enabled,
158160
);
159161

160162
match sync_point {

0 commit comments

Comments
 (0)