Skip to content

Commit 8244491

Browse files
committed
Merge branch 'master' into dev-iocp4
# Conflicts: # core/src/net/mod.rs # core/src/syscall/windows/connect.rs # core/src/syscall/windows/mod.rs
2 parents cce5961 + 7572bdd commit 8244491

File tree

14 files changed

+452
-104
lines changed

14 files changed

+452
-104
lines changed

core/src/common/constants.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ pub enum Syscall {
125125
SetFilePointerEx,
126126
#[cfg(windows)]
127127
WaitOnAddress,
128+
#[cfg(windows)]
129+
WSAPoll,
128130
}
129131

130132
impl Syscall {

core/src/net/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,16 @@ pub type UserFunc = extern "C" fn(usize) -> usize;
3939

4040
mod selector;
4141

42-
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
42+
#[allow(
43+
clippy::cast_possible_truncation,
44+
clippy::cast_sign_loss,
45+
clippy::too_many_arguments
46+
)]
4347
#[cfg(any(
4448
all(target_os = "linux", feature = "io_uring"),
4549
all(windows, feature = "iocp")
4650
))]
47-
pub(crate) mod operator;
51+
mod operator;
4852

4953
#[allow(missing_docs)]
5054
pub mod event_loop;
@@ -138,7 +142,7 @@ impl EventLoops {
138142
}
139143

140144
/// Get a `EventLoop`, prefer current.
141-
pub(crate) fn event_loop() -> &'static EventLoop<'static> {
145+
fn event_loop() -> &'static EventLoop<'static> {
142146
EventLoop::current().unwrap_or_else(|| Self::round_robin())
143147
}
144148

core/src/syscall/unix/connect.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use crate::common::now;
12
use crate::net::EventLoops;
2-
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_errno, set_non_blocking};
3+
use crate::syscall::common::{is_blocking, reset_errno, send_time_limit, set_blocking, set_errno, set_non_blocking};
34
use libc::{sockaddr, socklen_t};
45
use once_cell::sync::Lazy;
56
use std::ffi::{c_int, c_void};
@@ -61,22 +62,29 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
6162
if blocking {
6263
set_non_blocking(fd);
6364
}
65+
let start_time = now();
66+
let mut left_time = send_time_limit(fd);
6467
let mut r = self.inner.connect(fn_ptr, fd, address, len);
65-
loop {
68+
while left_time > 0 {
6669
if r == 0 {
6770
reset_errno();
6871
break;
6972
}
7073
let errno = Error::last_os_error().raw_os_error();
71-
if errno == Some(libc::EINPROGRESS) || errno == Some(libc::EALREADY) {
74+
if errno == Some(libc::EINPROGRESS) || errno == Some(libc::EALREADY) || errno == Some(libc::EWOULDBLOCK) {
7275
//阻塞,直到写事件发生
73-
if EventLoops::wait_write_event(fd, Some(crate::common::constants::SLICE)).is_err()
76+
left_time = start_time
77+
.saturating_add(send_time_limit(fd))
78+
.saturating_sub(now());
79+
let wait_time = std::time::Duration::from_nanos(left_time)
80+
.min(crate::common::constants::SLICE);
81+
if EventLoops::wait_write_event(fd, Some(wait_time)).is_err()
7482
{
7583
break;
7684
}
7785
let mut err = 0;
7886
unsafe {
79-
let mut len: socklen_t = std::mem::zeroed();
87+
let mut len = socklen_t::try_from(size_of_val(&err)).expect("overflow");
8088
r = libc::getsockopt(
8189
fd,
8290
libc::SOL_SOCKET,
@@ -96,7 +104,7 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
96104
};
97105
unsafe {
98106
let mut address = std::mem::zeroed();
99-
let mut address_len = std::mem::zeroed();
107+
let mut address_len = socklen_t::try_from(size_of_val(&address)).expect("overflow");
100108
r = libc::getpeername(fd, &mut address, &mut address_len);
101109
}
102110
} else if errno != Some(libc::EINTR) {

core/src/syscall/unix/mod.rs

Lines changed: 71 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ macro_rules! impl_facade {
3333
$crate::error!("{} change to running state failed !", co.name());
3434
}
3535
}
36-
$crate::info!("exit syscall {}", syscall);
36+
$crate::info!("exit syscall {} {:?} {}", syscall, r, std::io::Error::last_os_error());
3737
r
3838
}
3939
}
@@ -131,8 +131,9 @@ macro_rules! impl_nio_read {
131131
$crate::syscall::common::set_non_blocking($fd);
132132
}
133133
let start_time = $crate::common::now();
134-
let mut r;
135-
loop {
134+
let mut left_time = $crate::syscall::common::recv_time_limit($fd);
135+
let mut r = -1;
136+
while left_time > 0 {
136137
r = self.inner.$syscall(fn_ptr, $fd, $($arg, )*);
137138
if r != -1 {
138139
$crate::syscall::common::reset_errno();
@@ -141,9 +142,10 @@ macro_rules! impl_nio_read {
141142
let error_kind = std::io::Error::last_os_error().kind();
142143
if error_kind == std::io::ErrorKind::WouldBlock {
143144
//wait read event
144-
let wait_time = std::time::Duration::from_nanos(start_time
145+
left_time = start_time
145146
.saturating_add($crate::syscall::common::recv_time_limit($fd))
146-
.saturating_sub($crate::common::now()))
147+
.saturating_sub($crate::common::now());
148+
let wait_time = std::time::Duration::from_nanos(left_time)
147149
.min($crate::common::constants::SLICE);
148150
if $crate::net::EventLoops::wait_read_event(
149151
$fd,
@@ -187,9 +189,10 @@ macro_rules! impl_nio_read_buf {
187189
$crate::syscall::common::set_non_blocking($fd);
188190
}
189191
let start_time = $crate::common::now();
192+
let mut left_time = $crate::syscall::common::recv_time_limit($fd);
190193
let mut received = 0;
191194
let mut r = 0;
192-
while received < $len {
195+
while received < $len && left_time > 0 {
193196
r = self.inner.$syscall(
194197
fn_ptr,
195198
$fd,
@@ -208,9 +211,10 @@ macro_rules! impl_nio_read_buf {
208211
let error_kind = std::io::Error::last_os_error().kind();
209212
if error_kind == std::io::ErrorKind::WouldBlock {
210213
//wait read event
211-
let wait_time = std::time::Duration::from_nanos(start_time
214+
left_time = start_time
212215
.saturating_add($crate::syscall::common::recv_time_limit($fd))
213-
.saturating_sub($crate::common::now()))
216+
.saturating_sub($crate::common::now());
217+
let wait_time = std::time::Duration::from_nanos(left_time)
214218
.min($crate::common::constants::SLICE);
215219
if $crate::net::EventLoops::wait_read_event(
216220
$fd,
@@ -253,8 +257,15 @@ macro_rules! impl_nio_read_iovec {
253257
if blocking {
254258
$crate::syscall::common::set_non_blocking($fd);
255259
}
256-
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
257260
let start_time = $crate::common::now();
261+
let mut left_time = $crate::syscall::common::recv_time_limit($fd);
262+
let vec = unsafe {
263+
Vec::from_raw_parts(
264+
$iov.cast_mut(),
265+
$iovcnt as usize,
266+
$iovcnt as usize
267+
)
268+
};
258269
let mut length = 0;
259270
let mut received = 0usize;
260271
let mut r = 0;
@@ -270,7 +281,7 @@ macro_rules! impl_nio_read_iovec {
270281
for i in vec.iter().skip(index) {
271282
arg.push(*i);
272283
}
273-
while received < length {
284+
while received < length && left_time > 0 {
274285
if 0 != offset {
275286
arg[0] = libc::iovec {
276287
iov_base: (arg[0].iov_base as usize + offset) as *mut std::ffi::c_void,
@@ -304,9 +315,10 @@ macro_rules! impl_nio_read_iovec {
304315
let error_kind = std::io::Error::last_os_error().kind();
305316
if error_kind == std::io::ErrorKind::WouldBlock {
306317
//wait read event
307-
let wait_time = std::time::Duration::from_nanos(start_time
318+
left_time = start_time
308319
.saturating_add($crate::syscall::common::recv_time_limit($fd))
309-
.saturating_sub($crate::common::now()))
320+
.saturating_sub($crate::common::now());
321+
let wait_time = std::time::Duration::from_nanos(left_time)
310322
.min($crate::common::constants::SLICE);
311323
if $crate::net::EventLoops::wait_read_event(
312324
$fd,
@@ -363,9 +375,10 @@ macro_rules! impl_nio_write_buf {
363375
$crate::syscall::common::set_non_blocking($fd);
364376
}
365377
let start_time = $crate::common::now();
378+
let mut left_time = $crate::syscall::common::send_time_limit($fd);
366379
let mut sent = 0;
367380
let mut r = 0;
368-
while sent < $len {
381+
while sent < $len && left_time > 0 {
369382
r = self.inner.$syscall(
370383
fn_ptr,
371384
$fd,
@@ -384,9 +397,10 @@ macro_rules! impl_nio_write_buf {
384397
let error_kind = std::io::Error::last_os_error().kind();
385398
if error_kind == std::io::ErrorKind::WouldBlock {
386399
//wait write event
387-
let wait_time = std::time::Duration::from_nanos(start_time
400+
left_time = start_time
388401
.saturating_add($crate::syscall::common::send_time_limit($fd))
389-
.saturating_sub($crate::common::now()))
402+
.saturating_sub($crate::common::now());
403+
let wait_time = std::time::Duration::from_nanos(left_time)
390404
.min($crate::common::constants::SLICE);
391405
if $crate::net::EventLoops::wait_write_event(
392406
$fd,
@@ -431,8 +445,15 @@ macro_rules! impl_nio_write_iovec {
431445
if blocking {
432446
$crate::syscall::common::set_non_blocking($fd);
433447
}
434-
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
435448
let start_time = $crate::common::now();
449+
let mut left_time = $crate::syscall::common::send_time_limit($fd);
450+
let vec = unsafe {
451+
Vec::from_raw_parts(
452+
$iov.cast_mut(),
453+
$iovcnt as usize,
454+
$iovcnt as usize
455+
)
456+
};
436457
let mut length = 0;
437458
let mut sent = 0usize;
438459
let mut r = 0;
@@ -448,7 +469,7 @@ macro_rules! impl_nio_write_iovec {
448469
for i in vec.iter().skip(index) {
449470
arg.push(*i);
450471
}
451-
while sent < length {
472+
while sent < length && left_time > 0 {
452473
if 0 != offset {
453474
arg[0] = libc::iovec {
454475
iov_base: (arg[0].iov_base as usize + offset) as *mut std::ffi::c_void,
@@ -476,9 +497,10 @@ macro_rules! impl_nio_write_iovec {
476497
let error_kind = std::io::Error::last_os_error().kind();
477498
if error_kind == std::io::ErrorKind::WouldBlock {
478499
//wait write event
479-
let wait_time = std::time::Duration::from_nanos(start_time
500+
left_time = start_time
480501
.saturating_add($crate::syscall::common::send_time_limit($fd))
481-
.saturating_sub($crate::common::now()))
502+
.saturating_sub($crate::common::now());
503+
let wait_time = std::time::Duration::from_nanos(left_time)
482504
.min($crate::common::constants::SLICE);
483505
if $crate::net::EventLoops::wait_write_event(
484506
$fd,
@@ -670,16 +692,21 @@ pub extern "C" fn send_time_limit(fd: c_int) -> u64 {
670692
|| unsafe {
671693
let mut tv: libc::timeval = std::mem::zeroed();
672694
let mut len = size_of::<libc::timeval>() as libc::socklen_t;
673-
assert_eq!(
674-
0,
675-
libc::getsockopt(
676-
fd,
677-
libc::SOL_SOCKET,
678-
libc::SO_SNDTIMEO,
679-
std::ptr::from_mut(&mut tv).cast(),
680-
&mut len,
681-
)
682-
);
695+
if libc::getsockopt(
696+
fd,
697+
libc::SOL_SOCKET,
698+
libc::SO_SNDTIMEO,
699+
std::ptr::from_mut(&mut tv).cast(),
700+
&mut len,
701+
) == -1
702+
{
703+
let error = std::io::Error::last_os_error();
704+
if Some(libc::ENOTSOCK) == error.raw_os_error() {
705+
// not a socket
706+
return u64::MAX;
707+
}
708+
panic!("getsockopt failed: {error}");
709+
}
683710
let mut time_limit = (tv.tv_sec as u64)
684711
.saturating_mul(1_000_000_000)
685712
.saturating_add((tv.tv_usec as u64).saturating_mul(1_000));
@@ -700,16 +727,21 @@ pub extern "C" fn recv_time_limit(fd: c_int) -> u64 {
700727
|| unsafe {
701728
let mut tv: libc::timeval = std::mem::zeroed();
702729
let mut len = size_of::<libc::timeval>() as libc::socklen_t;
703-
assert_eq!(
704-
0,
705-
libc::getsockopt(
706-
fd,
707-
libc::SOL_SOCKET,
708-
libc::SO_RCVTIMEO,
709-
std::ptr::from_mut(&mut tv).cast(),
710-
&mut len,
711-
)
712-
);
730+
if libc::getsockopt(
731+
fd,
732+
libc::SOL_SOCKET,
733+
libc::SO_RCVTIMEO,
734+
std::ptr::from_mut(&mut tv).cast(),
735+
&mut len,
736+
) == -1
737+
{
738+
let error = std::io::Error::last_os_error();
739+
if Some(libc::ENOTSOCK) == error.raw_os_error() {
740+
// not a socket
741+
return u64::MAX;
742+
}
743+
panic!("getsockopt failed: {error}");
744+
}
713745
let mut time_limit = (tv.tv_sec as u64)
714746
.saturating_mul(1_000_000_000)
715747
.saturating_add((tv.tv_usec as u64).saturating_mul(1_000));

core/src/syscall/unix/poll.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ impl<I: PollSyscall> PollSyscall for NioPollSyscall<I> {
4747
let mut t = if timeout < 0 { c_int::MAX } else { timeout };
4848
let mut x = 1;
4949
let mut r;
50-
// just check select every x ms
50+
// just check poll every x ms
5151
loop {
5252
r = self.inner.poll(fn_ptr, fds, nfds, 0);
5353
if r != 0 || t == 0 {

core/src/syscall/unix/recvfrom.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,34 @@ trait RecvfromSyscall {
4242
}
4343

4444
impl_facade!(RecvfromSyscallFacade, RecvfromSyscall,
45-
recvfrom(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int,
46-
addr: *mut sockaddr, addrlen: *mut socklen_t) -> ssize_t
45+
recvfrom(
46+
fd: c_int,
47+
buf: *mut c_void,
48+
len: size_t,
49+
flags: c_int,
50+
addr: *mut sockaddr,
51+
addrlen: *mut socklen_t
52+
) -> ssize_t
4753
);
4854

4955
impl_nio_read_buf!(NioRecvfromSyscall, RecvfromSyscall,
50-
recvfrom(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int,
51-
addr: *mut sockaddr, addrlen: *mut socklen_t) -> ssize_t
56+
recvfrom(
57+
fd: c_int,
58+
buf: *mut c_void,
59+
len: size_t,
60+
flags: c_int,
61+
addr: *mut sockaddr,
62+
addrlen: *mut socklen_t
63+
) -> ssize_t
5264
);
5365

5466
impl_raw!(RawRecvfromSyscall, RecvfromSyscall,
55-
recvfrom(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int,
56-
addr: *mut sockaddr, addrlen: *mut socklen_t) -> ssize_t
67+
recvfrom(
68+
fd: c_int,
69+
buf: *mut c_void,
70+
len: size_t,
71+
flags: c_int,
72+
addr: *mut sockaddr,
73+
addrlen: *mut socklen_t
74+
) -> ssize_t
5775
);

0 commit comments

Comments
 (0)