Skip to content

Commit e9d81bb

Browse files
author
0x_antoni
committed
optimize wave2
1 parent dfd7889 commit e9d81bb

File tree

8 files changed

+89
-88
lines changed

8 files changed

+89
-88
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ hyper-util = "0.1"
322322
async-trait = "0.1"
323323
tower = "0.4"
324324
pin-project = "1.0"
325-
mio = "1.0.4"
325+
mio = { version = "1.0.4", features = ["os-ext", "net", "os-poll"] }
326326
mio-timer = { git = "https://github.com/Pana/mio-timer", rev = "251170b" }
327327
mio-misc = { git = "https://github.com/Pana/mio-misc", rev = "27ad80f" }
328328
reqwest = "0.12"

crates/network/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ cfx-addr = { workspace = true }
99
cfx-bytes = { workspace = true }
1010
cfx-types = { workspace = true }
1111
cfxkey = { workspace = true }
12-
mio = { workspace = true }
12+
mio = { workspace = true, features = ["os-ext", "net", "os-poll"] }
1313
parking_lot = { workspace = true }
1414
log = { workspace = true }
1515
slab = { workspace = true }

crates/network/src/connection.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use log::{debug, trace};
1313
use metrics::{
1414
register_meter_with_group, Gauge, GaugeUsize, Histogram, Meter, Sample,
1515
};
16-
use mio::{net::TcpStream, Interest, Poll, Token};
16+
use mio::{net::TcpStream, Interest, Registry, Token};
1717
use priority_send_queue::{PrioritySendQueue, SendQueuePriority};
1818
use serde::Deserialize;
1919
use serde_derive::Serialize;
@@ -398,7 +398,7 @@ impl Connection {
398398

399399
/// Register this connection with the IO event loop.
400400
pub fn register_socket(
401-
&mut self, reg: Token, event_loop: &Poll,
401+
&mut self, reg: Token, poll_registry: &Registry,
402402
) -> io::Result<()> {
403403
if self.registered.load(AtomicOrdering::SeqCst) {
404404
return Ok(());
@@ -409,9 +409,7 @@ impl Connection {
409409
reg
410410
);
411411
if let Err(e) =
412-
event_loop
413-
.registry()
414-
.register(&mut self.socket, reg, self.interest)
412+
poll_registry.register(&mut self.socket, reg, self.interest)
415413
{
416414
trace!(
417415
"Failed to register socket, token = {}, reg = {:?}, err = {:?}",
@@ -427,17 +425,17 @@ impl Connection {
427425
/// Update connection registration. Should be called at the end of the IO
428426
/// handler.
429427
pub fn update_socket(
430-
&mut self, reg: Token, event_loop: &Poll,
428+
&mut self, reg: Token, poll_registry: &Registry,
431429
) -> io::Result<()> {
432430
trace!(
433431
"Connection reregister, token = {}, reg = {:?}",
434432
self.token,
435433
reg
436434
);
437435
if !self.registered.load(AtomicOrdering::SeqCst) {
438-
self.register_socket(reg, event_loop)
436+
self.register_socket(reg, poll_registry)
439437
} else {
440-
event_loop.registry()
438+
poll_registry
441439
.reregister(&mut self.socket, reg, self.interest)
442440
.unwrap_or_else(|e| {
443441
trace!("Failed to reregister socket, token = {}, reg = {:?}, err = {:?}", self.token, reg, e);
@@ -448,9 +446,11 @@ impl Connection {
448446

449447
/// Delete connection registration. Should be called at the end of the IO
450448
/// handler.
451-
pub fn deregister_socket(&mut self, event_loop: &Poll) -> io::Result<()> {
449+
pub fn deregister_socket(
450+
&mut self, poll_registry: &Registry,
451+
) -> io::Result<()> {
452452
trace!("Connection deregister, token = {}", self.token);
453-
event_loop.registry().deregister(&mut self.socket).ok();
453+
poll_registry.deregister(&mut self.socket).ok();
454454
Ok(())
455455
}
456456

crates/network/src/service.rs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::{
1818
use keccak_hash::keccak;
1919
use mio::{
2020
net::{TcpListener, TcpStream, UdpSocket},
21-
Interest, Poll, Token,
21+
Interest, Registry, Token,
2222
};
2323
use parity_path::restrict_permissions_owner;
2424
use parking_lot::{Mutex, RwLock};
@@ -171,7 +171,6 @@ impl<'a> UdpIoContext<'a> {
171171
pub struct NetworkService {
172172
pub io_service: Option<IoService<NetworkIoMessage>>,
173173
pub inner: Option<Arc<NetworkServiceInner>>,
174-
network_poll: Arc<Mutex<Poll>>,
175174
config: NetworkConfiguration,
176175
}
177176

@@ -180,7 +179,6 @@ impl NetworkService {
180179
NetworkService {
181180
io_service: None,
182181
inner: None,
183-
network_poll: Arc::new(Mutex::new(Poll::new().unwrap())),
184182
config,
185183
}
186184
}
@@ -200,7 +198,7 @@ impl NetworkService {
200198
&mut self, pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
201199
) -> Result<(), Error> {
202200
let raw_io_service =
203-
IoService::<NetworkIoMessage>::start(self.network_poll.clone())?;
201+
IoService::<NetworkIoMessage>::start(STOP_NET_POLL)?;
204202
self.io_service = Some(raw_io_service);
205203

206204
if self.inner.is_none() {
@@ -226,18 +224,10 @@ impl NetworkService {
226224

227225
pub fn start(&self) {
228226
let handler = self.inner.as_ref().unwrap().clone();
229-
let main_event_loop_channel =
230-
self.io_service.as_ref().unwrap().channel();
231227
self.io_service
232228
.as_ref()
233229
.expect("Already set")
234-
.start_network_poll(
235-
self.network_poll.clone(),
236-
handler,
237-
main_event_loop_channel,
238-
MAX_SESSIONS,
239-
STOP_NET_POLL,
240-
);
230+
.start_network_poll(handler, MAX_SESSIONS);
241231
}
242232

243233
/// Add a P2P peer to the client as a trusted node
@@ -1756,19 +1746,19 @@ impl IoHandler<NetworkIoMessage> for NetworkServiceInner {
17561746
}
17571747

17581748
fn register_stream(
1759-
&self, stream: StreamToken, reg: Token, event_loop: &Poll,
1749+
&self, stream: StreamToken, reg: Token, poll_registry: &Registry,
17601750
) {
17611751
match stream {
17621752
FIRST_SESSION..=LAST_SESSION => {
17631753
if let Some(session) = self.sessions.get(stream) {
17641754
session
17651755
.write()
1766-
.register_socket(reg, event_loop)
1756+
.register_socket(reg, poll_registry)
17671757
.expect("Error registering socket");
17681758
}
17691759
}
17701760
TCP_ACCEPT => {
1771-
event_loop.registry()
1761+
poll_registry
17721762
.register(
17731763
&mut *self.tcp_listener.lock(),
17741764
Token(TCP_ACCEPT),
@@ -1777,7 +1767,7 @@ impl IoHandler<NetworkIoMessage> for NetworkServiceInner {
17771767
.expect("Error registering stream");
17781768
}
17791769
UDP_MESSAGE => {
1780-
event_loop.registry()
1770+
poll_registry
17811771
.register(
17821772
&mut *self.udp_socket.lock(),
17831773
reg,
@@ -1789,13 +1779,13 @@ impl IoHandler<NetworkIoMessage> for NetworkServiceInner {
17891779
}
17901780
}
17911781

1792-
fn deregister_stream(&self, stream: StreamToken, event_loop: &Poll) {
1782+
fn deregister_stream(&self, stream: StreamToken, poll_registry: &Registry) {
17931783
match stream {
17941784
FIRST_SESSION..=LAST_SESSION => {
17951785
if let Some(session) = self.sessions.get(stream) {
17961786
let mut sess = session.write();
17971787
if sess.expired() {
1798-
sess.deregister_socket(event_loop)
1788+
sess.deregister_socket(poll_registry)
17991789
.expect("Error deregistering socket");
18001790
if let Some(node_id) = sess.id() {
18011791
self.node_db.write().note_failure(
@@ -1813,18 +1803,18 @@ impl IoHandler<NetworkIoMessage> for NetworkServiceInner {
18131803
}
18141804

18151805
fn update_stream(
1816-
&self, stream: StreamToken, reg: Token, event_loop: &Poll,
1806+
&self, stream: StreamToken, reg: Token, poll_registry: &Registry,
18171807
) {
18181808
match stream {
18191809
FIRST_SESSION..=LAST_SESSION => {
18201810
if let Some(session) = self.sessions.get(stream) {
18211811
session
18221812
.write()
1823-
.update_socket(reg, event_loop)
1813+
.update_socket(reg, poll_registry)
18241814
.expect("Error updating socket");
18251815
}
18261816
}
1827-
TCP_ACCEPT => event_loop.registry()
1817+
TCP_ACCEPT => poll_registry
18281818
.reregister(
18291819
&mut *self.tcp_listener.lock(),
18301820
Token(TCP_ACCEPT),
@@ -1840,7 +1830,7 @@ impl IoHandler<NetworkIoMessage> for NetworkServiceInner {
18401830
} else {
18411831
Interest::READABLE
18421832
};
1843-
event_loop.registry()
1833+
poll_registry
18441834
.reregister(
18451835
&mut *udp_socket,
18461836
reg,

crates/network/src/session.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use diem_crypto::{bls::BLS_PUBLIC_KEY_LENGTH, ValidCryptoMaterial};
1717
use diem_types::validator_config::{ConsensusPublicKey, ConsensusVRFPublicKey};
1818
use io::{IoContext, StreamToken};
1919
use log::{debug, trace};
20-
use mio::{net::TcpStream, Poll, Token};
20+
use mio::{net::TcpStream, Registry, Token};
2121
use priority_send_queue::SendQueuePriority;
2222
use rlp::{Rlp, RlpStream};
2323
use serde::Deserialize;
@@ -177,28 +177,28 @@ impl Session {
177177
/// Register event loop for the underlying connection.
178178
/// If session expired, no effect taken.
179179
pub fn register_socket(
180-
&mut self, reg: Token, event_loop: &Poll,
180+
&mut self, reg: Token, poll_registry: &Registry,
181181
) -> Result<(), Error> {
182182
if !self.expired() {
183-
self.connection_mut().register_socket(reg, event_loop)?;
183+
self.connection_mut().register_socket(reg, poll_registry)?;
184184
}
185185

186186
Ok(())
187187
}
188188

189189
/// Update the event loop for the underlying connection.
190190
pub fn update_socket(
191-
&mut self, reg: Token, event_loop: &Poll,
191+
&mut self, reg: Token, poll_registry: &Registry,
192192
) -> Result<(), Error> {
193-
self.connection_mut().update_socket(reg, event_loop)?;
193+
self.connection_mut().update_socket(reg, poll_registry)?;
194194
Ok(())
195195
}
196196

197197
/// Deregister the event loop for the underlying connection.
198198
pub fn deregister_socket(
199-
&mut self, event_loop: &Poll,
199+
&mut self, poll_registry: &Registry,
200200
) -> Result<(), Error> {
201-
self.connection_mut().deregister_socket(event_loop)?;
201+
self.connection_mut().deregister_socket(poll_registry)?;
202202
Ok(())
203203
}
204204

crates/util/io/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ version = "0.1.0"
77
edition = "2021"
88

99
[dependencies]
10-
mio = { workspace = true }
10+
mio = { workspace = true, features = ["os-ext", "net", "os-poll"] }
1111
crossbeam-deque = { workspace = true }
1212
crossbeam-channel = { workspace = true }
1313
parking_lot = { workspace = true }

crates/util/io/src/lib.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub use crate::service_mio::{
3131
TimerToken, TOKENS_PER_HANDLER,
3232
};
3333

34-
use mio::{Poll, Token};
34+
use mio::{Registry, Token};
3535
use mio_util::NotifyError;
3636
use std::{cell::Cell, env, error, fmt, io};
3737

@@ -99,22 +99,21 @@ where Message: Send + Sync + 'static
9999
fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
100100
/// Register a new stream with the event loop
101101
fn register_stream(
102-
&self, _stream: StreamToken, _reg: Token, _event_loop: &Poll,
102+
&self, _stream: StreamToken, _reg: Token, _registry: &Registry,
103103
) {
104104
}
105105
/// Re-register a stream with the event loop
106106
fn update_stream(
107-
&self, _stream: StreamToken, _reg: Token, _event_loop: &Poll,
107+
&self, _stream: StreamToken, _reg: Token, _registry: &Registry,
108108
) {
109109
}
110110
/// Deregister a stream. Called when stream is removed from event loop
111-
fn deregister_stream(&self, _stream: StreamToken, _event_loop: &Poll) {}
111+
fn deregister_stream(&self, _stream: StreamToken, _registry: &Registry) {}
112112
}
113113

114114
#[cfg(test)]
115115
mod tests {
116116
use super::*;
117-
use parking_lot::Mutex;
118117
use std::{
119118
sync::{atomic, Arc},
120119
thread,
@@ -143,9 +142,8 @@ mod tests {
143142
}
144143

145144
let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
146-
let poll = Arc::new(Mutex::new(Poll::new().unwrap()));
147145

148-
let service = IoService::<MyMessage>::start(poll)
146+
let service = IoService::<MyMessage>::start(123)
149147
.expect("Error creating network service");
150148
service.register_handler(handler.clone()).unwrap();
151149

@@ -178,9 +176,8 @@ mod tests {
178176
}
179177

180178
let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
181-
let poll = Arc::new(Mutex::new(Poll::new().unwrap()));
182179

183-
let service = IoService::<MyMessage>::start(poll)
180+
let service = IoService::<MyMessage>::start(123)
184181
.expect("Error creating network service");
185182
service.register_handler(handler.clone()).unwrap();
186183

@@ -210,9 +207,8 @@ mod tests {
210207
}
211208

212209
let handler = Arc::new(MyHandler(atomic::AtomicUsize::new(0)));
213-
let poll = Arc::new(Mutex::new(Poll::new().unwrap()));
214210

215-
let service = IoService::<MyMessage>::start(poll)
211+
let service = IoService::<MyMessage>::start(123)
216212
.expect("Error creating network service");
217213
service.register_handler(handler.clone()).unwrap();
218214

0 commit comments

Comments
 (0)