Skip to content

Commit 41ee5de

Browse files
committed
fix: gracefully handle dropped connections
1 parent 2d1c5b1 commit 41ee5de

File tree

3 files changed

+70
-58
lines changed

3 files changed

+70
-58
lines changed

modules/peer_network_interface/src/connection.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use crate::network::PeerMessageSender;
2020

2121
pub struct PeerConnection {
2222
pub address: String,
23-
chainsync: mpsc::Sender<ChainsyncCommand>,
24-
blockfetch: mpsc::Sender<BlockfetchCommand>,
23+
chainsync: mpsc::UnboundedSender<ChainsyncCommand>,
24+
blockfetch: mpsc::UnboundedSender<BlockfetchCommand>,
2525
}
2626

2727
impl PeerConnection {
@@ -31,8 +31,8 @@ impl PeerConnection {
3131
magic,
3232
sender,
3333
};
34-
let (chainsync_tx, chainsync_rx) = mpsc::channel(16);
35-
let (blockfetch_tx, blockfetch_rx) = mpsc::channel(16);
34+
let (chainsync_tx, chainsync_rx) = mpsc::unbounded_channel();
35+
let (blockfetch_tx, blockfetch_rx) = mpsc::unbounded_channel();
3636
tokio::spawn(async move {
3737
tokio::time::sleep(delay).await;
3838
worker.run(chainsync_rx, blockfetch_rx).await;
@@ -46,17 +46,17 @@ impl PeerConnection {
4646

4747
pub async fn find_tip(&self) -> Result<Point> {
4848
let (tx, rx) = oneshot::channel();
49-
self.chainsync.send(ChainsyncCommand::FindTip(tx)).await?;
49+
self.chainsync.send(ChainsyncCommand::FindTip(tx))?;
5050
Ok(rx.await?)
5151
}
5252

53-
pub async fn find_intersect(&self, points: Vec<Point>) -> Result<()> {
54-
self.chainsync.send(ChainsyncCommand::FindIntersect(points)).await?;
53+
pub fn find_intersect(&self, points: Vec<Point>) -> Result<()> {
54+
self.chainsync.send(ChainsyncCommand::FindIntersect(points))?;
5555
Ok(())
5656
}
5757

58-
pub async fn request_block(&self, hash: BlockHash, height: u64) -> Result<()> {
59-
self.blockfetch.send(BlockfetchCommand::Fetch(hash, height)).await?;
58+
pub fn request_block(&self, hash: BlockHash, height: u64) -> Result<()> {
59+
self.blockfetch.send(BlockfetchCommand::Fetch(hash, height))?;
6060
Ok(())
6161
}
6262
}
@@ -98,8 +98,8 @@ struct PeerConnectionWorker {
9898
impl PeerConnectionWorker {
9999
async fn run(
100100
mut self,
101-
chainsync: mpsc::Receiver<ChainsyncCommand>,
102-
blockfetch: mpsc::Receiver<BlockfetchCommand>,
101+
chainsync: mpsc::UnboundedReceiver<ChainsyncCommand>,
102+
blockfetch: mpsc::UnboundedReceiver<BlockfetchCommand>,
103103
) {
104104
if let Err(err) = self.do_run(chainsync, blockfetch).await {
105105
error!(peer = self.address, "{err:#}");
@@ -109,8 +109,8 @@ impl PeerConnectionWorker {
109109

110110
async fn do_run(
111111
&mut self,
112-
chainsync: mpsc::Receiver<ChainsyncCommand>,
113-
blockfetch: mpsc::Receiver<BlockfetchCommand>,
112+
chainsync: mpsc::UnboundedReceiver<ChainsyncCommand>,
113+
blockfetch: mpsc::UnboundedReceiver<BlockfetchCommand>,
114114
) -> Result<()> {
115115
let client = PeerClient::connect(self.address.clone(), self.magic).await?;
116116
select! {
@@ -122,7 +122,7 @@ impl PeerConnectionWorker {
122122
async fn run_chainsync(
123123
&self,
124124
mut client: chainsync::N2NClient,
125-
mut commands: mpsc::Receiver<ChainsyncCommand>,
125+
mut commands: mpsc::UnboundedReceiver<ChainsyncCommand>,
126126
) -> Result<()> {
127127
let mut reached = None;
128128
loop {
@@ -137,6 +137,12 @@ impl PeerConnectionWorker {
137137
let Some(cmd) = cmd else {
138138
bail!("parent process has disconnected");
139139
};
140+
if !client.has_agency() {
141+
// To run find_intersect, we must have agency.
142+
// If we don't, it's because we requested the next response already.
143+
// There's no way to cancel that request, so just wait for it to finish.
144+
client.recv_while_must_reply().await?;
145+
};
140146
match cmd {
141147
ChainsyncCommand::FindIntersect(points) => {
142148
let (point, _) = client.find_intersect(points).await?;
@@ -158,7 +164,7 @@ impl PeerConnectionWorker {
158164
async fn run_blockfetch(
159165
&self,
160166
mut client: blockfetch::Client,
161-
mut commands: mpsc::Receiver<BlockfetchCommand>,
167+
mut commands: mpsc::UnboundedReceiver<BlockfetchCommand>,
162168
) -> Result<()> {
163169
while let Some(BlockfetchCommand::Fetch(hash, slot)) = commands.recv().await {
164170
let point = Point::Specific(slot, hash.to_vec());

modules/peer_network_interface/src/network.rs

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::{HashMap, VecDeque},
2+
collections::{BTreeMap, HashMap, VecDeque},
33
time::Duration,
44
};
55

@@ -16,7 +16,7 @@ use tracing::{info, warn};
1616
pub struct NetworkManager {
1717
network_magic: u64,
1818
next_id: u64,
19-
peers: HashMap<PeerId, PeerConnection>,
19+
peers: BTreeMap<PeerId, PeerConnection>,
2020
preferred_upstream: Option<PeerId>,
2121
blocks_to_fetch: VecDeque<Header>,
2222
blocks: HashMap<BlockHash, BlockStatus>,
@@ -38,7 +38,7 @@ impl NetworkManager {
3838
Self {
3939
network_magic,
4040
next_id: 0,
41-
peers: HashMap::new(),
41+
peers: BTreeMap::new(),
4242
preferred_upstream: None,
4343
blocks_to_fetch: VecDeque::new(),
4444
blocks: HashMap::new(),
@@ -54,18 +54,18 @@ impl NetworkManager {
5454
pub async fn run(mut self) -> Result<()> {
5555
while let Some(event) = self.events.recv().await {
5656
match event {
57-
NetworkEvent::NewConnection { address, delay } => {
58-
self.handle_new_connection(address, delay).await
59-
}
6057
NetworkEvent::PeerUpdate { peer, event } => {
61-
self.handle_peer_update(peer, event).await?
58+
let maybe_publish_blocks = self.handle_peer_update(peer, event)?;
59+
if maybe_publish_blocks {
60+
self.publish_blocks().await?;
61+
}
6262
}
6363
}
6464
}
6565
bail!("event sink closed")
6666
}
6767

68-
pub async fn handle_new_connection(&mut self, address: String, delay: Duration) {
68+
pub fn handle_new_connection(&mut self, address: String, delay: Duration) {
6969
let id = PeerId(self.next_id);
7070
self.next_id += 1;
7171
let sender = PeerMessageSender {
@@ -75,10 +75,10 @@ impl NetworkManager {
7575
let conn = PeerConnection::new(address, self.network_magic, sender, delay);
7676
if self.preferred_upstream.is_none() {
7777
self.peers.insert(id, conn);
78-
self.set_preferred_upstream(id).await;
78+
self.set_preferred_upstream(id);
7979
} else {
8080
if let Some(head) = self.head.clone()
81-
&& let Err(error) = conn.find_intersect(vec![head]).await
81+
&& let Err(error) = conn.find_intersect(vec![head])
8282
{
8383
warn!("could not sync {}: {error}", conn.address);
8484
}
@@ -96,26 +96,26 @@ impl NetworkManager {
9696
};
9797
match conn.find_tip().await {
9898
Ok(point) => {
99-
self.sync_to_point(point).await;
99+
self.sync_to_point(point);
100100
return Ok(());
101101
}
102102
Err(e) => {
103103
warn!("could not fetch tip from {}: {e}", conn.address);
104-
self.handle_disconnect(upstream).await?;
104+
self.handle_disconnect(upstream);
105105
}
106106
}
107107
}
108108
}
109109

110-
pub async fn sync_to_point(&mut self, point: Point) {
110+
pub fn sync_to_point(&mut self, point: Point) {
111111
for conn in self.peers.values() {
112-
if let Err(error) = conn.find_intersect(vec![point.clone()]).await {
112+
if let Err(error) = conn.find_intersect(vec![point.clone()]) {
113113
warn!("could not sync {}: {error}", conn.address);
114114
}
115115
}
116116
}
117117

118-
async fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) -> Result<()> {
118+
fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) -> Result<bool> {
119119
let is_preferred = self.preferred_upstream.is_some_and(|id| id == peer);
120120
match event {
121121
PeerEvent::ChainSync(PeerChainSyncEvent::RollForward(header)) => {
@@ -132,21 +132,20 @@ impl NetworkManager {
132132
let Some(peer) = self.peers.get(&announcer) else {
133133
continue;
134134
};
135-
if let Err(e) = peer.request_block(header.hash, header.slot).await
135+
if let Err(e) = peer.request_block(header.hash, header.slot)
136136
{
137137
warn!("could not request block from {}: {e}", peer.address);
138-
self.handle_disconnect(announcer).await?
138+
self.handle_disconnect(announcer);
139139
}
140140
break; // only fetch from one
141141
}
142142
}
143+
Ok(false)
143144
}
144145
BlockStatus::Fetched(_) => {
145-
if is_preferred {
146-
// Chainsync has requested a block which we've already fetched,
147-
// so we might be able to publish one or more.
148-
self.publish_blocks().await?;
149-
}
146+
// If chainsync has requested a block which we've already fetched,
147+
// we might be able to publish one or more.
148+
Ok(is_preferred)
150149
}
151150
}
152151
}
@@ -173,40 +172,48 @@ impl NetworkManager {
173172
}
174173
}
175174
}
175+
Ok(false)
176176
}
177177
PeerEvent::BlockFetched(fetched) => {
178178
let Some(block) = self.blocks.get_mut(&fetched.hash) else {
179-
return Ok(());
179+
return Ok(false);
180180
};
181181
block.set_body(&fetched.body);
182-
self.publish_blocks().await?;
182+
Ok(true)
183+
}
184+
PeerEvent::Disconnected => {
185+
self.handle_disconnect(peer);
186+
Ok(false)
183187
}
184-
PeerEvent::Disconnected => self.handle_disconnect(peer).await?,
185188
}
186-
Ok(())
187189
}
188190

189-
async fn handle_disconnect(&mut self, peer: PeerId) -> Result<()> {
191+
fn handle_disconnect(&mut self, peer: PeerId) {
190192
let Some(conn) = self.peers.remove(&peer) else {
191-
return Ok(());
193+
return;
192194
};
195+
warn!("disconnected from {}", conn.address);
193196
let is_preferred = self.preferred_upstream.is_some_and(|id| id == peer);
194197
if is_preferred && let Some(new_preferred) = self.peers.keys().next().copied() {
195-
self.set_preferred_upstream(new_preferred).await;
198+
self.set_preferred_upstream(new_preferred);
196199
}
197-
self.events_sender
198-
.send(NetworkEvent::NewConnection {
199-
address: conn.address,
200-
delay: Duration::from_secs(5),
201-
})
202-
.await?;
203-
Ok(())
200+
if self.peers.is_empty() {
201+
warn!("no upstream peers!");
202+
}
203+
let address = conn.address.clone();
204+
drop(conn);
205+
self.handle_new_connection(address, Duration::from_secs(5));
204206
}
205207

206-
async fn set_preferred_upstream(&mut self, peer: PeerId) {
208+
fn set_preferred_upstream(&mut self, peer: PeerId) {
209+
if let Some(conn) = self.peers.get(&peer) {
210+
info!("setting preferred upstream to {}", conn.address);
211+
} else {
212+
warn!("setting preferred upstream to unrecognized node {peer:?}");
213+
}
207214
self.preferred_upstream = Some(peer);
208215
if let Some(head) = self.head.clone() {
209-
self.sync_to_point(head).await;
216+
self.sync_to_point(head);
210217
}
211218
}
212219

@@ -229,11 +236,10 @@ impl NetworkManager {
229236
}
230237

231238
pub enum NetworkEvent {
232-
NewConnection { address: String, delay: Duration },
233239
PeerUpdate { peer: PeerId, event: PeerEvent },
234240
}
235241

236-
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
242+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
237243
pub struct PeerId(u64);
238244

239245
pub struct PeerMessageSender {

modules/peer_network_interface/src/peer_network_interface.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,18 @@ impl PeerNetworkInterface {
5858

5959
let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink);
6060
for address in cfg.node_addresses {
61-
manager.handle_new_connection(address, Duration::ZERO).await;
61+
manager.handle_new_connection(address, Duration::ZERO);
6262
}
6363

6464
match cfg.sync_point {
65-
SyncPoint::Origin => manager.sync_to_point(Point::Origin).await,
65+
SyncPoint::Origin => manager.sync_to_point(Point::Origin),
6666
SyncPoint::Tip => manager.sync_to_tip().await?,
6767
SyncPoint::Cache => unimplemented!(),
6868
SyncPoint::Snapshot => {
6969
let mut subscription =
7070
snapshot_complete.expect("Snapshot topic subscription missing");
7171
let point = Self::wait_snapshot_completion(&mut subscription).await?;
72-
manager.sync_to_point(point).await;
72+
manager.sync_to_point(point);
7373
}
7474
}
7575

0 commit comments

Comments
 (0)