Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 73 additions & 27 deletions substrate/client/network/statement/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl StatementHandlerPrototype {
);
}

let is_major_syncing = sync.is_major_syncing();
let handler = StatementHandler {
protocol_name: self.protocol_name,
notification_service: self.notification_service,
Expand All @@ -379,6 +380,8 @@ impl StatementHandlerPrototype {
initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
had_major_syncing: is_major_syncing,
deferred_peers: Vec::new(),
};

Ok(handler)
Expand Down Expand Up @@ -423,6 +426,10 @@ pub struct StatementHandler<
pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
/// Queue for round-robin processing of initial syncs.
initial_sync_peer_queue: VecDeque<PeerId>,
/// Whether the node was major syncing in the previous loop iteration
had_major_syncing: bool,
/// Peers whose statement protocol connection was deferred during major sync
deferred_peers: Vec<(PeerId, multiaddr::Multiaddr)>,
}

/// Per-peer rate limiter using a token bucket algorithm.
Expand Down Expand Up @@ -585,6 +592,8 @@ where
initial_sync_timeout: Box::pin(pending().fuse()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
had_major_syncing: false,
deferred_peers: Vec::new(),
}
}

Expand Down Expand Up @@ -639,6 +648,14 @@ where
Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
},
}

let currently_syncing = self.sync.is_major_syncing();
if !self.had_major_syncing && currently_syncing {
self.on_major_sync_started();
} else if self.had_major_syncing && !currently_syncing {
self.on_major_sync_complete();
}
self.had_major_syncing = currently_syncing;
}
}

Expand Down Expand Up @@ -692,15 +709,20 @@ where
SyncEvent::PeerConnected(remote) => {
let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
.collect::<multiaddr::Multiaddr>();
let result = self.network.add_peers_to_reserved_set(
self.protocol_name.clone(),
iter::once(addr).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
if self.sync.is_major_syncing() {
self.deferred_peers.push((remote, addr));
} else {
let result = self.network.add_peers_to_reserved_set(
self.protocol_name.clone(),
iter::once(addr).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
}
}
},
SyncEvent::PeerDisconnected(remote) => {
self.deferred_peers.retain(|(p, _)| *p != remote);
let result = self.network.remove_peers_from_reserved_set(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function actually disconnecting the nodes ? As far as I know they just get remove from reserved set but it does not mean they get disconnected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if reserved_only = true -> remove_peers_from_reserved_set does disconnect

reserved_only: set_config.non_reserved_mode.is_reserved_only(),

SS setup the NonReservedPeerMode::Deny at protocol creation

self.protocol_name.clone(),
iter::once(remote).collect(),
Expand All @@ -712,6 +734,37 @@ where
}
}

fn on_major_sync_started(&mut self) {
let connected: Vec<PeerId> = self.peers.keys().copied().collect();
for peer_id in connected {
let addr = iter::once(multiaddr::Protocol::P2p(peer_id.into()))
.collect::<multiaddr::Multiaddr>();
self.deferred_peers.push((peer_id, addr));

let result = self.network.remove_peers_from_reserved_set(
self.protocol_name.clone(),
iter::once(peer_id).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
}
}
}

/// Called when major sync completes reconnect all deferred peers
fn on_major_sync_complete(&mut self) {
let peers = std::mem::take(&mut self.deferred_peers);
for (_peer_id, addr) in peers {
let result = self.network.add_peers_to_reserved_set(
self.protocol_name.clone(),
iter::once(addr).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
}
}
}

async fn handle_notification_event(&mut self, event: NotificationEvent) {
match event {
NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
Expand Down Expand Up @@ -745,18 +798,16 @@ where
metrics.peers_connected.set(self.peers.len() as u64);
});

if !self.sync.is_major_syncing() {
let hashes = self.statement_store.statement_hashes();
if !hashes.is_empty() {
self.pending_initial_syncs.insert(
peer,
PendingInitialSync { hashes, started_at: Instant::now() },
);
self.initial_sync_peer_queue.push_back(peer);
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_peers_active.inc();
});
}
let hashes = self.statement_store.statement_hashes();
if !hashes.is_empty() {
self.pending_initial_syncs.insert(
peer,
PendingInitialSync { hashes, started_at: Instant::now() },
);
self.initial_sync_peer_queue.push_back(peer);
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_peers_active.inc();
});
}
},
NotificationEvent::NotificationStreamClosed { peer } => {
Expand All @@ -781,15 +832,6 @@ where
metrics.bytes_received_total.inc_by(bytes_received);
});

// Accept statements only when node is not major syncing
if self.sync.is_major_syncing() {
log::trace!(
target: LOG_TARGET,
"{peer}: Ignoring statements while major syncing or offline"
);
return;
}

if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
self.on_statements(peer, statements);
} else {
Expand Down Expand Up @@ -1492,6 +1534,8 @@ mod tests {
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
had_major_syncing: false,
deferred_peers: Vec::new(),
};
(handler, statement_store, network, notification_service, queue_receiver)
}
Expand Down Expand Up @@ -1699,6 +1743,8 @@ mod tests {
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
had_major_syncing: false,
deferred_peers: Vec::new(),
};
(handler, statement_store, network, notification_service)
}
Expand Down
Loading