Skip to content

Commit 9158441

Browse files
authored
Merge pull request #286 from rustaceanrob/peer-id-2-6
Introduce `PeerId` for stricter type meaning across the crate
2 parents 0aaf4e4 + cf0617c commit 9158441

File tree

5 files changed

+57
-34
lines changed

5 files changed

+57
-34
lines changed

src/core/channel_messages.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use bitcoin::{
1111

1212
use crate::core::messages::RejectPayload;
1313

14+
use super::PeerId;
15+
1416
#[derive(Debug, Clone)]
1517
pub(crate) enum MainThreadMessage {
1618
GetAddr,
@@ -36,7 +38,7 @@ pub struct GetBlockConfig {
3638
}
3739

3840
pub(crate) struct PeerThreadMessage {
39-
pub nonce: u32,
41+
pub nonce: PeerId,
4042
pub message: PeerMessage,
4143
}
4244

src/core/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
//! To build a [`Node`](node::Node) and [`Client`](client::Client), please refer to the [`NodeBuilder`](builder::NodeBuilder), which allows for node
88
//! configuration.
99
10+
use std::hash::Hash;
1011
use std::time::Duration;
1112

1213
use tokio::time::Instant;
@@ -84,3 +85,24 @@ impl PeerTimeoutConfig {
8485
}
8586
}
8687
}
88+
89+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90+
pub(crate) struct PeerId(u32);
91+
92+
impl PeerId {
93+
pub(crate) fn increment(&mut self) {
94+
self.0 = self.0.wrapping_add(1)
95+
}
96+
}
97+
98+
impl From<u32> for PeerId {
99+
fn from(value: u32) -> Self {
100+
PeerId(value)
101+
}
102+
}
103+
104+
impl std::fmt::Display for PeerId {
105+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106+
write!(f, "Peer {}", self.0)
107+
}
108+
}

src/core/node.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use super::{
3838
dialog::Dialog,
3939
error::NodeError,
4040
messages::{ClientMessage, Event, Log, SyncUpdate, Warning},
41-
FilterSyncPolicy, LastBlockMonitor, PeerTimeoutConfig,
41+
FilterSyncPolicy, LastBlockMonitor, PeerId, PeerTimeoutConfig,
4242
};
4343

4444
pub(crate) const ADDR_V2_VERSION: u32 = 70015;
@@ -211,13 +211,13 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
211211
};
212212
let response = self.handle_version(peer_thread.nonce, version, best).await?;
213213
self.send_message(peer_thread.nonce, response).await;
214-
self.dialog.send_dialog(format!("[Peer {}]: version", peer_thread.nonce))
214+
self.dialog.send_dialog(format!("[{}]: version", peer_thread.nonce))
215215
.await;
216216
}
217217
PeerMessage::Addr(addresses) => self.handle_new_addrs(addresses).await,
218218
PeerMessage::Headers(headers) => {
219219
last_block.reset();
220-
self.dialog.send_dialog(format!("[Peer {}]: headers", peer_thread.nonce))
220+
self.dialog.send_dialog(format!("[{}]: headers", peer_thread.nonce))
221221
.await;
222222
match self.handle_headers(peer_thread.nonce, headers).await {
223223
Some(response) => {
@@ -227,7 +227,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
227227
}
228228
}
229229
PeerMessage::FilterHeaders(cf_headers) => {
230-
self.dialog.send_dialog(format!("[Peer {}]: filter headers", peer_thread.nonce)).await;
230+
self.dialog.send_dialog(format!("[{}]: filter headers", peer_thread.nonce)).await;
231231
match self.handle_cf_headers(peer_thread.nonce, cf_headers).await {
232232
Some(response) => {
233233
self.broadcast(response).await;
@@ -250,7 +250,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
250250
None => continue,
251251
},
252252
PeerMessage::NewBlocks(blocks) => {
253-
self.dialog.send_dialog(format!("[Peer {}]: inv", peer_thread.nonce))
253+
self.dialog.send_dialog(format!("[{}]: inv", peer_thread.nonce))
254254
.await;
255255
match self.handle_inventory_blocks(peer_thread.nonce, blocks).await {
256256
Some(response) => {
@@ -340,7 +340,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
340340
}
341341

342342
// Send a message to a specified peer
343-
async fn send_message(&self, nonce: u32, message: MainThreadMessage) {
343+
async fn send_message(&self, nonce: PeerId, message: MainThreadMessage) {
344344
let mut peer_map = self.peer_map.lock().await;
345345
peer_map.send_message(nonce, message).await;
346346
}
@@ -524,7 +524,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
524524
// We accepted a handshake with a peer but we may disconnect if they do not support CBF
525525
async fn handle_version(
526526
&self,
527-
nonce: u32,
527+
nonce: PeerId,
528528
version_message: VersionMessage,
529529
best_height: u32,
530530
) -> Result<MainThreadMessage, NodeError<H::Error, P::Error>> {
@@ -590,7 +590,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
590590
// We always send headers to our peers, so our next message depends on our state
591591
async fn handle_headers(
592592
&self,
593-
peer_id: u32,
593+
peer_id: PeerId,
594594
headers: Vec<Header>,
595595
) -> Option<MainThreadMessage> {
596596
let mut chain = self.chain.lock().await;
@@ -624,11 +624,11 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
624624
// Compact filter headers may result in a number of outcomes, including the need to audit filters.
625625
async fn handle_cf_headers(
626626
&self,
627-
peer_id: u32,
627+
peer_id: PeerId,
628628
cf_headers: CFHeaders,
629629
) -> Option<MainThreadMessage> {
630630
let mut chain = self.chain.lock().await;
631-
match chain.sync_cf_headers(peer_id, cf_headers).await {
631+
match chain.sync_cf_headers(peer_id.0, cf_headers).await {
632632
Ok(potential_message) => match potential_message {
633633
AppendAttempt::AddedToQueue => None,
634634
AppendAttempt::Extended => self.next_stateful_message(chain.deref_mut()).await,
@@ -652,7 +652,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
652652
}
653653

654654
// Handle a new compact block filter
655-
async fn handle_filter(&self, peer_id: u32, filter: CFilter) -> Option<MainThreadMessage> {
655+
async fn handle_filter(&self, peer_id: PeerId, filter: CFilter) -> Option<MainThreadMessage> {
656656
let mut chain = self.chain.lock().await;
657657
match chain.sync_filter(filter).await {
658658
Ok(potential_message) => potential_message.map(MainThreadMessage::GetFilters),
@@ -673,7 +673,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
673673
}
674674

675675
// Scan a block for transactions.
676-
async fn handle_block(&self, peer_id: u32, block: Block) -> Option<MainThreadMessage> {
676+
async fn handle_block(&self, peer_id: PeerId, block: Block) -> Option<MainThreadMessage> {
677677
let mut chain = self.chain.lock().await;
678678
if let Err(e) = chain.check_send_block(block).await {
679679
self.dialog.send_warning(Warning::UnexpectedSyncError {
@@ -713,7 +713,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
713713
// If new inventory came in, we need to download the headers and update the node state
714714
async fn handle_inventory_blocks(
715715
&self,
716-
nonce: u32,
716+
nonce: PeerId,
717717
blocks: Vec<BlockHash>,
718718
) -> Option<MainThreadMessage> {
719719
let mut state = self.state.write().await;

src/core/peer_map.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use super::{
3535
dialog::Dialog,
3636
error::PeerManagerError,
3737
messages::Warning,
38-
PeerTimeoutConfig,
38+
PeerId, PeerTimeoutConfig,
3939
};
4040

4141
const MAX_TRIES: usize = 50;
@@ -58,11 +58,11 @@ pub(crate) struct ManagedPeer {
5858
// The `PeerMap` manages connections with peers, adds and bans peers, and manages the peer database
5959
#[derive(Debug)]
6060
pub(crate) struct PeerMap<P: PeerStore> {
61-
num_peers: u32,
62-
heights: HashMap<u32, u32>,
61+
current_id: PeerId,
62+
heights: HashMap<PeerId, u32>,
6363
network: Network,
6464
mtx: Sender<PeerThreadMessage>,
65-
map: HashMap<u32, ManagedPeer>,
65+
map: HashMap<PeerId, ManagedPeer>,
6666
db: Arc<Mutex<P>>,
6767
connector: Arc<Mutex<dyn NetworkConnector + Send + Sync>>,
6868
whitelist: Whitelist,
@@ -94,7 +94,7 @@ impl<P: PeerStore> PeerMap<P> {
9494
}
9595
};
9696
Self {
97-
num_peers: 0,
97+
current_id: PeerId(0),
9898
heights: HashMap::new(),
9999
network,
100100
mtx,
@@ -139,7 +139,7 @@ impl<P: PeerStore> PeerMap<P> {
139139
}
140140

141141
// Set the time offset of a connected peer
142-
pub fn set_offset(&mut self, peer: u32, time: i64) {
142+
pub fn set_offset(&mut self, peer: PeerId, time: i64) {
143143
if let Some(peer) = self.map.get_mut(&peer) {
144144
let now = SystemTime::now()
145145
.duration_since(UNIX_EPOCH)
@@ -162,10 +162,9 @@ impl<P: PeerStore> PeerMap<P> {
162162
// Send out a TCP connection to a new peer and begin tracking the task
163163
pub async fn dispatch(&mut self, loaded_peer: PersistedPeer) -> Result<(), PeerError> {
164164
let (ptx, prx) = mpsc::channel::<MainThreadMessage>(32);
165-
let peer_num = self.num_peers + 1;
166-
self.num_peers = peer_num;
165+
self.current_id.increment();
167166
let mut peer = Peer::new(
168-
peer_num,
167+
self.current_id,
169168
self.network,
170169
self.mtx.clone(),
171170
prx,
@@ -188,7 +187,7 @@ impl<P: PeerStore> PeerMap<P> {
188187
.await?;
189188
let handle = tokio::spawn(async move { peer.run(reader, writer).await });
190189
self.map.insert(
191-
peer_num,
190+
self.current_id,
192191
ManagedPeer {
193192
service_flags: loaded_peer.services,
194193
address: loaded_peer.addr,
@@ -203,26 +202,26 @@ impl<P: PeerStore> PeerMap<P> {
203202
}
204203

205204
// Set the minimum fee rate this peer will accept
206-
pub fn set_broadcast_min(&mut self, nonce: u32, fee_rate: FeeRate) {
205+
pub fn set_broadcast_min(&mut self, nonce: PeerId, fee_rate: FeeRate) {
207206
if let Some(peer) = self.map.get_mut(&nonce) {
208207
peer.broadcast_min = fee_rate;
209208
}
210209
}
211210

212211
// Set the services of a peer
213-
pub fn set_services(&mut self, nonce: u32, flags: ServiceFlags) {
212+
pub fn set_services(&mut self, nonce: PeerId, flags: ServiceFlags) {
214213
if let Some(peer) = self.map.get_mut(&nonce) {
215214
peer.service_flags = flags
216215
}
217216
}
218217

219218
// Set the height of a peer upon receiving the version message
220-
pub fn set_height(&mut self, nonce: u32, height: u32) {
219+
pub fn set_height(&mut self, nonce: PeerId, height: u32) {
221220
self.heights.insert(nonce, height);
222221
}
223222

224223
// Add one to the height of a peer when receiving inventory
225-
pub fn add_one_height(&mut self, nonce: u32) {
224+
pub fn add_one_height(&mut self, nonce: PeerId) {
226225
if let Some(height) = self.heights.get(&nonce) {
227226
self.heights.insert(nonce, height + 1);
228227
}
@@ -243,7 +242,7 @@ impl<P: PeerStore> PeerMap<P> {
243242
}
244243

245244
// Send a message to the specified peer
246-
pub async fn send_message(&mut self, nonce: u32, message: MainThreadMessage) {
245+
pub async fn send_message(&mut self, nonce: PeerId, message: MainThreadMessage) {
247246
if let Some(peer) = self.map.get(&nonce) {
248247
let _ = peer.ptx.send(message).await;
249248
}
@@ -344,7 +343,7 @@ impl<P: PeerStore> PeerMap<P> {
344343
}
345344

346345
// We tried this peer and successfully connected.
347-
pub async fn tried(&mut self, nonce: u32) {
346+
pub async fn tried(&mut self, nonce: PeerId) {
348347
if let Some(peer) = self.map.get(&nonce) {
349348
let mut db = self.db.lock().await;
350349
if let Err(e) = db
@@ -367,7 +366,7 @@ impl<P: PeerStore> PeerMap<P> {
367366
}
368367

369368
// This peer misbehaved in some way.
370-
pub async fn ban(&mut self, nonce: u32) {
369+
pub async fn ban(&mut self, nonce: PeerId) {
371370
if let Some(peer) = self.map.get(&nonce) {
372371
let mut db = self.db.lock().await;
373372
if let Err(e) = db

src/network/peer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::{
1818
channel_messages::{MainThreadMessage, PeerMessage, PeerThreadMessage},
1919
dialog::Dialog,
2020
messages::Warning,
21-
PeerTimeoutConfig,
21+
PeerId, PeerTimeoutConfig,
2222
},
2323
network::outbound_messages::V1OutboundMessage,
2424
};
@@ -42,7 +42,7 @@ const HANDSHAKE_TIMEOUT: u64 = 4;
4242
type MutexMessageGenerator = Mutex<Box<dyn MessageGenerator>>;
4343

4444
pub(crate) struct Peer {
45-
nonce: u32,
45+
nonce: PeerId,
4646
main_thread_sender: Sender<PeerThreadMessage>,
4747
main_thread_recv: Receiver<MainThreadMessage>,
4848
network: Network,
@@ -54,7 +54,7 @@ pub(crate) struct Peer {
5454

5555
impl Peer {
5656
pub fn new(
57-
nonce: u32,
57+
nonce: PeerId,
5858
network: Network,
5959
main_thread_sender: Sender<PeerThreadMessage>,
6060
main_thread_recv: Receiver<MainThreadMessage>,

0 commit comments

Comments
 (0)