Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions p2p/examples/feeler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ fn main() {
.set_user_agent("/bitcoin-feeler:0.1.0".to_string())
.connection_timeout(Duration::from_millis(3500))
.change_network(NETWORK)
.open_connection(socket_addr);
.open_feeler(socket_addr);
match connection {
Ok(_) => tracing::info!("Connection successful!"),
Ok(f) => {
tracing::info!(
"Connection successful: Advertised protocol version {}, Adveristed services {}",
f.protocol_version.0,
f.services
);
}
Err(e) => tracing::warn!("Connection failed {e:?}"),
}
}
35 changes: 28 additions & 7 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl ConnectionContext {
read_half: ReadHalf,
negotiation: Negotiation,
their_services: ServiceFlags,
their_version: ProtocolVerison,
) -> Self {
let read_ctx = ReadContext {
read_half,
Expand All @@ -83,6 +84,7 @@ impl ConnectionContext {
write_half,
negotiation,
their_services,
their_protocol_verison: their_version,
};
Self {
read_ctx,
Expand Down Expand Up @@ -210,6 +212,7 @@ pub struct WriteContext {
write_half: WriteHalf,
negotiation: Negotiation,
their_services: ServiceFlags,
their_protocol_verison: ProtocolVerison,
}

impl WriteContext {
Expand Down Expand Up @@ -432,6 +435,12 @@ impl Default for Offered {
}
}

#[derive(Debug, Clone, Copy)]
pub struct Feeler {
pub services: ServiceFlags,
pub protocol_version: ProtocolVerison,
}

pub(crate) struct MessageHeader {
magic: Magic,
_command: CommandString,
Expand Down Expand Up @@ -490,7 +499,7 @@ fn interpret_first_message(
nonce: u64,
their_expected_version: ProtocolVerison,
their_expected_services: ServiceFlags,
) -> Result<(), HandshakeError> {
) -> Result<(ProtocolVerison, ServiceFlags), HandshakeError> {
if let NetworkMessage::Version(version) = message {
if version.nonce.eq(&nonce) {
return Err(HandshakeError::ConnectedToSelf);
Expand All @@ -503,10 +512,10 @@ fn interpret_first_message(
if !version.services.has(their_expected_services) {
return Err(HandshakeError::UnsupportedFeature);
}
Ok((ProtocolVerison(version.version), version.services))
} else {
return Err(HandshakeError::IrrelevantMessage(message));
Err(HandshakeError::IrrelevantMessage(message))
}
Ok(())
}

/// Errors when parsing a peer-to-peer message.
Expand Down Expand Up @@ -656,8 +665,15 @@ macro_rules! define_version_message_logic {
let version = $awaiter!(read_half.read_message(&mut $reader))?;
match version {
Some(version) => {
interpret_first_message(version, nonce, $conn.their_version, $conn.their_services)
.map_err(ConnectionError::Protocol)?;
let (protocol, services) = interpret_first_message(
version,
nonce,
$conn.their_version,
$conn.their_services,
)
.map_err(ConnectionError::Protocol)?;
$conn.their_services = services;
$conn.their_version = protocol;
}
None => {
return Err(ConnectionError::Protocol(HandshakeError::BadDecoy));
Expand Down Expand Up @@ -717,8 +733,13 @@ macro_rules! define_version_message_logic {
&mut write_half,
))?;
}
let context =
ConnectionContext::new(write_half, read_half, negotiation, $conn.their_services);
let context = ConnectionContext::new(
write_half,
read_half,
negotiation,
$conn.their_services,
$conn.their_version,
);
Ok(($reader, context))
}};
}
Expand Down
25 changes: 22 additions & 3 deletions p2p/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use bitcoin::{consensus, p2p::message_compact_blocks::SendCmpct};

use crate::{
blocking_awaiter, interpret_first_message, make_version, version_handshake_blocking,
ConnectionBuilder, ConnectionContext, HandshakeError, Negotiation, ParseMessageError,
ConnectionBuilder, ConnectionContext, Feeler, HandshakeError, Negotiation, ParseMessageError,
ReadContext, ReadHalf, WriteContext, WriteHalf,
};

Expand All @@ -30,11 +30,15 @@ pub trait ConnectionExt {
self,
tcp_stream: TcpStream,
) -> Result<(TcpStream, ConnectionContext), ConnectionError>;

/// Open a "feeler" connection to test if the peer is online and update their services and
/// protocol version.
fn open_feeler(self, to: impl Into<SocketAddr>) -> Result<Feeler, ConnectionError>;
}

impl ConnectionExt for ConnectionBuilder {
fn open_connection(
self,
mut self,
to: impl Into<SocketAddr>,
) -> Result<(TcpStream, ConnectionContext), ConnectionError> {
let socket_addr = to.into();
Expand All @@ -44,11 +48,26 @@ impl ConnectionExt for ConnectionBuilder {
}

fn start_handshake(
self,
mut self,
mut tcp_stream: TcpStream,
) -> Result<(TcpStream, ConnectionContext), ConnectionError> {
version_handshake_blocking!(tcp_stream, self)
}

fn open_feeler(mut self, to: impl Into<SocketAddr>) -> Result<Feeler, ConnectionError> {
let socket_addr = to.into();
let mut tcp_stream = TcpStream::connect_timeout(&socket_addr, self.tcp_timeout)?;
let res: Result<(TcpStream, ConnectionContext), ConnectionError> =
version_handshake_blocking!(tcp_stream, self);
let (_, ctx) = res?;
let (_, wtx) = ctx.into_split();
let services = wtx.their_services;
let protocol_version = wtx.their_protocol_verison;
Ok(Feeler {
services,
protocol_version,
})
}
}

#[allow(clippy::result_large_err)]
Expand Down
26 changes: 23 additions & 3 deletions p2p/src/tokio_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{

use crate::{
async_awaiter, interpret_first_message, make_version, version_handshake_async,
ConnectionBuilder, ConnectionContext, HandshakeError, Negotiation, ParseMessageError,
ConnectionBuilder, ConnectionContext, Feeler, HandshakeError, Negotiation, ParseMessageError,
ReadContext, ReadHalf, WriteContext, WriteHalf,
};

Expand All @@ -36,13 +36,17 @@ pub trait TokioConnectionExt {
self,
tcp_stream: TcpStream,
) -> Result<(TcpStream, ConnectionContext), Self::Error>;

///
#[allow(async_fn_in_trait)]
async fn open_feeler(self, to: impl Into<SocketAddr>) -> Result<Feeler, Self::Error>;
}

impl TokioConnectionExt for ConnectionBuilder {
type Error = ConnectionError;

async fn open_connection(
self,
mut self,
to: impl Into<SocketAddr>,
) -> Result<(TcpStream, ConnectionContext), Self::Error> {
let socket_addr = to.into();
Expand All @@ -53,11 +57,27 @@ impl TokioConnectionExt for ConnectionBuilder {
}

async fn start_handshake(
self,
mut self,
mut tcp_stream: TcpStream,
) -> Result<(TcpStream, ConnectionContext), Self::Error> {
version_handshake_async!(tcp_stream, self)
}

async fn open_feeler(mut self, to: impl Into<SocketAddr>) -> Result<Feeler, Self::Error> {
let socket_addr = to.into();
let timeout = tokio::time::timeout(self.tcp_timeout, TcpStream::connect(socket_addr)).await;
let mut tcp_stream =
timeout.map_err(|_| ConnectionError::Protocol(HandshakeError::Timeout))??;
let res: Result<(TcpStream, ConnectionContext), Self::Error> =
version_handshake_async!(tcp_stream, self);
let (_, ctx) = res?;
let services = ctx.write_ctx.their_services;
let protocol_version = ctx.write_ctx.their_protocol_verison;
Ok(Feeler {
services,
protocol_version,
})
}
}

async fn write_message<W: AsyncWriteExt + Send + Sync + Unpin>(
Expand Down
Loading