Skip to content

Commit 5a5531c

Browse files
committed
refactor(network): remove unnecessary Mutex for reading and writing types
1 parent 4518685 commit 5a5531c

File tree

3 files changed

+17
-22
lines changed

3 files changed

+17
-22
lines changed

src/network/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use bitcoin::{
1111
use tokio::{
1212
io::{AsyncRead, AsyncWrite},
1313
net::TcpStream,
14-
sync::Mutex,
1514
time::Instant,
1615
};
1716

@@ -36,8 +35,8 @@ pub const RUST_BITCOIN_VERSION: &str = "0.32.4";
3635
const THIRTY_MINS: u64 = 60 * 30;
3736
const CONNECTION_TIMEOUT: u64 = 2;
3837

39-
pub(crate) type StreamReader = Mutex<Box<dyn AsyncRead + Send + Unpin>>;
40-
pub(crate) type StreamWriter = Mutex<Box<dyn AsyncWrite + Send + Unpin>>;
38+
pub(crate) type StreamReader = Box<dyn AsyncRead + Send + Sync + Unpin>;
39+
pub(crate) type StreamWriter = Box<dyn AsyncWrite + Send + Sync + Unpin>;
4140

4241
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
4342
pub(crate) struct PeerId(pub(crate) u32);
@@ -134,7 +133,7 @@ impl ConnectionType {
134133
match timeout {
135134
Ok(stream) => {
136135
let (reader, writer) = stream.into_split();
137-
Ok((Mutex::new(Box::new(reader)), Mutex::new(Box::new(writer))))
136+
Ok((Box::new(reader), Box::new(writer)))
138137
}
139138
Err(_) => Err(PeerError::ConnectionFailed),
140139
}

src/network/parsers.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ impl V1MessageParser {
2424
}
2525

2626
async fn do_read_message(&mut self) -> Result<Option<NetworkMessage>, PeerReadError> {
27-
let mut stream = self.stream.lock().await;
2827
let mut message_buf = vec![0_u8; 24];
29-
let _ = stream
28+
let _ = self
29+
.stream
3030
.read_exact(&mut message_buf)
3131
.await
3232
.map_err(|_| PeerReadError::ReadBuffer)?;
@@ -42,7 +42,8 @@ impl V1MessageParser {
4242
return Err(PeerReadError::Deserialization);
4343
}
4444
let mut contents_buf = vec![0_u8; header.length as usize];
45-
let _ = stream
45+
let _ = self
46+
.stream
4647
.read_exact(&mut contents_buf)
4748
.await
4849
.map_err(|_| PeerReadError::ReadBuffer)?;
@@ -70,9 +71,9 @@ impl V2MessageParser {
7071
}
7172

7273
async fn do_read_message(&mut self) -> Result<Option<NetworkMessage>, PeerReadError> {
73-
let mut stream = self.stream.lock().await;
7474
let mut len_buf = [0; 3];
75-
let _ = stream
75+
let _ = self
76+
.stream
7677
.read_exact(&mut len_buf)
7778
.await
7879
.map_err(|_| PeerReadError::ReadBuffer)?;
@@ -81,7 +82,8 @@ impl V2MessageParser {
8182
return Err(PeerReadError::TooManyMessages);
8283
}
8384
let mut response_message = vec![0; message_len];
84-
let _ = stream
85+
let _ = self
86+
.stream
8587
.read_exact(&mut response_message)
8688
.await
8789
.map_err(|_| PeerReadError::ReadBuffer)?;

src/network/peer.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,21 +73,16 @@ impl Peer {
7373

7474
pub async fn run(
7575
&mut self,
76-
reader: StreamReader,
77-
writer: StreamWriter,
76+
mut reader: StreamReader,
77+
mut writer: StreamWriter,
7878
) -> Result<(), PeerError> {
7979
let start_time = Instant::now();
8080
let (tx, mut rx) = mpsc::channel(32);
81-
let mut lock = writer.lock().await;
82-
let writer = lock.deref_mut();
83-
8481
// If a peer signals for V2 we will use it, otherwise just use plaintext.
8582
let (message_mutex, mut peer_reader) = if self.services.has(ServiceFlags::P2P_V2) {
86-
let mut lock = reader.lock().await;
87-
let read_lock = lock.deref_mut();
8883
let handshake_result = tokio::time::timeout(
8984
Duration::from_secs(HANDSHAKE_TIMEOUT),
90-
self.try_handshake(writer, read_lock),
85+
self.try_handshake(&mut writer, &mut reader),
9186
)
9287
.await
9388
.map_err(|_| PeerError::HandshakeFailed)?;
@@ -101,7 +96,6 @@ impl Peer {
10196
let (decryptor, encryptor) = handshake_result?;
10297
let message_mutex: MutexMessageGenerator =
10398
Mutex::new(Box::new(V2OutboundMessage::new(self.network, encryptor)));
104-
drop(lock);
10599
let reader = Reader::new(V2MessageParser::new(reader, decryptor), tx);
106100
(message_mutex, reader)
107101
} else {
@@ -114,7 +108,7 @@ impl Peer {
114108
let mut message_lock = message_mutex.lock().await;
115109
let outbound_messages = message_lock.deref_mut();
116110
let message = outbound_messages.version_message(None)?;
117-
self.write_bytes(writer, message).await?;
111+
self.write_bytes(&mut writer, message).await?;
118112
self.message_counter.sent_version();
119113
let read_handle = tokio::spawn(async move {
120114
peer_reader
@@ -147,7 +141,7 @@ impl Peer {
147141
if let Ok(peer_message) = peer_message {
148142
match peer_message {
149143
Some(message) => {
150-
match self.handle_peer_message(message, writer, outbound_messages).await {
144+
match self.handle_peer_message(message, &mut writer, outbound_messages).await {
151145
Ok(()) => continue,
152146
Err(e) => {
153147
match e {
@@ -166,7 +160,7 @@ impl Peer {
166160
node_message = self.main_thread_recv.recv() => {
167161
match node_message {
168162
Some(message) => {
169-
match self.main_thread_request(message, writer, outbound_messages).await {
163+
match self.main_thread_request(message, &mut writer, outbound_messages).await {
170164
Ok(()) => continue,
171165
Err(e) => {
172166
match e {

0 commit comments

Comments
 (0)