diff --git a/p2p/examples/feeler.rs b/p2p/examples/feeler.rs index a36941b..06c6ddd 100644 --- a/p2p/examples/feeler.rs +++ b/p2p/examples/feeler.rs @@ -22,9 +22,15 @@ fn main() { .set_user_agent("/bitcoin-feeler:0.1.0".to_string()) .connection_timeout(Duration::from_millis(3500)) .change_network(NETWORK) - .open_connection(socket_addr); + .open_feeler(socket_addr); match connection { - Ok(_) => tracing::info!("Connection successful!"), + Ok(f) => { + tracing::info!( + "Connection successful: Advertised protocol version {}, Adveristed services {}", + f.protocol_version.0, + f.services + ); + } Err(e) => tracing::warn!("Connection failed {e:?}"), } } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 1f5cc71..4474f94 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -70,6 +70,7 @@ impl ConnectionContext { read_half: ReadHalf, negotiation: Negotiation, their_services: ServiceFlags, + their_version: ProtocolVerison, ) -> Self { let read_ctx = ReadContext { read_half, @@ -83,6 +84,7 @@ impl ConnectionContext { write_half, negotiation, their_services, + their_protocol_verison: their_version, }; Self { read_ctx, @@ -210,6 +212,7 @@ pub struct WriteContext { write_half: WriteHalf, negotiation: Negotiation, their_services: ServiceFlags, + their_protocol_verison: ProtocolVerison, } impl WriteContext { @@ -432,6 +435,12 @@ impl Default for Offered { } } +#[derive(Debug, Clone, Copy)] +pub struct Feeler { + pub services: ServiceFlags, + pub protocol_version: ProtocolVerison, +} + pub(crate) struct MessageHeader { magic: Magic, _command: CommandString, @@ -490,7 +499,7 @@ fn interpret_first_message( nonce: u64, their_expected_version: ProtocolVerison, their_expected_services: ServiceFlags, -) -> Result<(), HandshakeError> { +) -> Result<(ProtocolVerison, ServiceFlags), HandshakeError> { if let NetworkMessage::Version(version) = message { if version.nonce.eq(&nonce) { return Err(HandshakeError::ConnectedToSelf); @@ -503,10 +512,10 @@ fn interpret_first_message( if !version.services.has(their_expected_services) { return Err(HandshakeError::UnsupportedFeature); } + Ok((ProtocolVerison(version.version), version.services)) } else { - return Err(HandshakeError::IrrelevantMessage(message)); + Err(HandshakeError::IrrelevantMessage(message)) } - Ok(()) } /// Errors when parsing a peer-to-peer message. @@ -656,8 +665,15 @@ macro_rules! define_version_message_logic { let version = $awaiter!(read_half.read_message(&mut $reader))?; match version { Some(version) => { - interpret_first_message(version, nonce, $conn.their_version, $conn.their_services) - .map_err(ConnectionError::Protocol)?; + let (protocol, services) = interpret_first_message( + version, + nonce, + $conn.their_version, + $conn.their_services, + ) + .map_err(ConnectionError::Protocol)?; + $conn.their_services = services; + $conn.their_version = protocol; } None => { return Err(ConnectionError::Protocol(HandshakeError::BadDecoy)); @@ -717,8 +733,13 @@ macro_rules! define_version_message_logic { &mut write_half, ))?; } - let context = - ConnectionContext::new(write_half, read_half, negotiation, $conn.their_services); + let context = ConnectionContext::new( + write_half, + read_half, + negotiation, + $conn.their_services, + $conn.their_version, + ); Ok(($reader, context)) }}; } diff --git a/p2p/src/net.rs b/p2p/src/net.rs index 6038d9a..8cac68d 100644 --- a/p2p/src/net.rs +++ b/p2p/src/net.rs @@ -11,7 +11,7 @@ use bitcoin::{consensus, p2p::message_compact_blocks::SendCmpct}; use crate::{ blocking_awaiter, interpret_first_message, make_version, version_handshake_blocking, - ConnectionBuilder, ConnectionContext, HandshakeError, Negotiation, ParseMessageError, + ConnectionBuilder, ConnectionContext, Feeler, HandshakeError, Negotiation, ParseMessageError, ReadContext, ReadHalf, WriteContext, WriteHalf, }; @@ -30,11 +30,15 @@ pub trait ConnectionExt { self, tcp_stream: TcpStream, ) -> Result<(TcpStream, ConnectionContext), ConnectionError>; + + /// Open a "feeler" connection to test if the peer is online and update their services and + /// protocol version. + fn open_feeler(self, to: impl Into) -> Result; } impl ConnectionExt for ConnectionBuilder { fn open_connection( - self, + mut self, to: impl Into, ) -> Result<(TcpStream, ConnectionContext), ConnectionError> { let socket_addr = to.into(); @@ -44,11 +48,26 @@ impl ConnectionExt for ConnectionBuilder { } fn start_handshake( - self, + mut self, mut tcp_stream: TcpStream, ) -> Result<(TcpStream, ConnectionContext), ConnectionError> { version_handshake_blocking!(tcp_stream, self) } + + fn open_feeler(mut self, to: impl Into) -> Result { + let socket_addr = to.into(); + let mut tcp_stream = TcpStream::connect_timeout(&socket_addr, self.tcp_timeout)?; + let res: Result<(TcpStream, ConnectionContext), ConnectionError> = + version_handshake_blocking!(tcp_stream, self); + let (_, ctx) = res?; + let (_, wtx) = ctx.into_split(); + let services = wtx.their_services; + let protocol_version = wtx.their_protocol_verison; + Ok(Feeler { + services, + protocol_version, + }) + } } #[allow(clippy::result_large_err)] diff --git a/p2p/src/tokio_ext.rs b/p2p/src/tokio_ext.rs index cca390f..41ea607 100644 --- a/p2p/src/tokio_ext.rs +++ b/p2p/src/tokio_ext.rs @@ -14,7 +14,7 @@ use tokio::{ use crate::{ async_awaiter, interpret_first_message, make_version, version_handshake_async, - ConnectionBuilder, ConnectionContext, HandshakeError, Negotiation, ParseMessageError, + ConnectionBuilder, ConnectionContext, Feeler, HandshakeError, Negotiation, ParseMessageError, ReadContext, ReadHalf, WriteContext, WriteHalf, }; @@ -36,13 +36,17 @@ pub trait TokioConnectionExt { self, tcp_stream: TcpStream, ) -> Result<(TcpStream, ConnectionContext), Self::Error>; + + /// + #[allow(async_fn_in_trait)] + async fn open_feeler(self, to: impl Into) -> Result; } impl TokioConnectionExt for ConnectionBuilder { type Error = ConnectionError; async fn open_connection( - self, + mut self, to: impl Into, ) -> Result<(TcpStream, ConnectionContext), Self::Error> { let socket_addr = to.into(); @@ -53,11 +57,27 @@ impl TokioConnectionExt for ConnectionBuilder { } async fn start_handshake( - self, + mut self, mut tcp_stream: TcpStream, ) -> Result<(TcpStream, ConnectionContext), Self::Error> { version_handshake_async!(tcp_stream, self) } + + async fn open_feeler(mut self, to: impl Into) -> Result { + let socket_addr = to.into(); + let timeout = tokio::time::timeout(self.tcp_timeout, TcpStream::connect(socket_addr)).await; + let mut tcp_stream = + timeout.map_err(|_| ConnectionError::Protocol(HandshakeError::Timeout))??; + let res: Result<(TcpStream, ConnectionContext), Self::Error> = + version_handshake_async!(tcp_stream, self); + let (_, ctx) = res?; + let services = ctx.write_ctx.their_services; + let protocol_version = ctx.write_ctx.their_protocol_verison; + Ok(Feeler { + services, + protocol_version, + }) + } } async fn write_message(