Skip to content

Commit 7c55eb5

Browse files
committed
fix connect bug
1 parent 5ee597c commit 7c55eb5

File tree

8 files changed

+205
-93
lines changed

8 files changed

+205
-93
lines changed

core/src/syscall/unix/connect.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use crate::common::now;
12
use crate::net::EventLoops;
2-
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_errno, set_non_blocking};
3+
use crate::syscall::common::{is_blocking, reset_errno, send_time_limit, set_blocking, set_errno, set_non_blocking};
34
use libc::{sockaddr, socklen_t};
45
use once_cell::sync::Lazy;
56
use std::ffi::{c_int, c_void};
@@ -61,22 +62,26 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
6162
if blocking {
6263
set_non_blocking(fd);
6364
}
65+
let start_time = now();
6466
let mut r = self.inner.connect(fn_ptr, fd, address, len);
65-
loop {
67+
while start_time
68+
.saturating_add(send_time_limit(fd))
69+
.saturating_sub(now()) > 0
70+
{
6671
if r == 0 {
6772
reset_errno();
6873
break;
6974
}
7075
let errno = Error::last_os_error().raw_os_error();
71-
if errno == Some(libc::EINPROGRESS) || errno == Some(libc::EALREADY) {
76+
if errno == Some(libc::EINPROGRESS) || errno == Some(libc::EALREADY) || errno == Some(libc::EWOULDBLOCK) {
7277
//阻塞,直到写事件发生
7378
if EventLoops::wait_write_event(fd, Some(crate::common::constants::SLICE)).is_err()
7479
{
7580
break;
7681
}
7782
let mut err = 0;
7883
unsafe {
79-
let mut len: socklen_t = std::mem::zeroed();
84+
let mut len = socklen_t::try_from(size_of_val(&err)).expect("overflow");
8085
r = libc::getsockopt(
8186
fd,
8287
libc::SOL_SOCKET,
@@ -96,7 +101,7 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
96101
};
97102
unsafe {
98103
let mut address = std::mem::zeroed();
99-
let mut address_len = std::mem::zeroed();
104+
let mut address_len = socklen_t::try_from(size_of_val(&address)).expect("overflow");
100105
r = libc::getpeername(fd, &mut address, &mut address_len);
101106
}
102107
} else if errno != Some(libc::EINTR) {

core/src/syscall/unix/mod.rs

Lines changed: 68 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,11 @@ macro_rules! impl_nio_read {
131131
$crate::syscall::common::set_non_blocking($fd);
132132
}
133133
let start_time = $crate::common::now();
134-
let mut r;
135-
loop {
134+
let mut left_time = start_time
135+
.saturating_add($crate::syscall::common::recv_time_limit($fd))
136+
.saturating_sub(start_time);
137+
let mut r = -1;
138+
while left_time > 0 {
136139
r = self.inner.$syscall(fn_ptr, $fd, $($arg, )*);
137140
if r != -1 {
138141
$crate::syscall::common::reset_errno();
@@ -141,9 +144,10 @@ macro_rules! impl_nio_read {
141144
let error_kind = std::io::Error::last_os_error().kind();
142145
if error_kind == std::io::ErrorKind::WouldBlock {
143146
//wait read event
144-
let wait_time = std::time::Duration::from_nanos(start_time
147+
left_time = start_time
145148
.saturating_add($crate::syscall::common::recv_time_limit($fd))
146-
.saturating_sub($crate::common::now()))
149+
.saturating_sub($crate::common::now());
150+
let wait_time = std::time::Duration::from_nanos(left_time)
147151
.min($crate::common::constants::SLICE);
148152
if $crate::net::EventLoops::wait_read_event(
149153
$fd,
@@ -187,9 +191,12 @@ macro_rules! impl_nio_read_buf {
187191
$crate::syscall::common::set_non_blocking($fd);
188192
}
189193
let start_time = $crate::common::now();
194+
let mut left_time = start_time
195+
.saturating_add($crate::syscall::common::recv_time_limit($fd))
196+
.saturating_sub(start_time);
190197
let mut received = 0;
191198
let mut r = 0;
192-
while received < $len {
199+
while received < $len && left_time > 0 {
193200
r = self.inner.$syscall(
194201
fn_ptr,
195202
$fd,
@@ -208,9 +215,10 @@ macro_rules! impl_nio_read_buf {
208215
let error_kind = std::io::Error::last_os_error().kind();
209216
if error_kind == std::io::ErrorKind::WouldBlock {
210217
//wait read event
211-
let wait_time = std::time::Duration::from_nanos(start_time
218+
left_time = start_time
212219
.saturating_add($crate::syscall::common::recv_time_limit($fd))
213-
.saturating_sub($crate::common::now()))
220+
.saturating_sub($crate::common::now());
221+
let wait_time = std::time::Duration::from_nanos(left_time)
214222
.min($crate::common::constants::SLICE);
215223
if $crate::net::EventLoops::wait_read_event(
216224
$fd,
@@ -253,8 +261,11 @@ macro_rules! impl_nio_read_iovec {
253261
if blocking {
254262
$crate::syscall::common::set_non_blocking($fd);
255263
}
256-
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
257264
let start_time = $crate::common::now();
265+
let mut left_time = start_time
266+
.saturating_add($crate::syscall::common::recv_time_limit($fd))
267+
.saturating_sub(start_time);
268+
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
258269
let mut length = 0;
259270
let mut received = 0usize;
260271
let mut r = 0;
@@ -270,7 +281,7 @@ macro_rules! impl_nio_read_iovec {
270281
for i in vec.iter().skip(index) {
271282
arg.push(*i);
272283
}
273-
while received < length {
284+
while received < length && left_time > 0 {
274285
if 0 != offset {
275286
arg[0] = libc::iovec {
276287
iov_base: (arg[0].iov_base as usize + offset) as *mut std::ffi::c_void,
@@ -304,9 +315,10 @@ macro_rules! impl_nio_read_iovec {
304315
let error_kind = std::io::Error::last_os_error().kind();
305316
if error_kind == std::io::ErrorKind::WouldBlock {
306317
//wait read event
307-
let wait_time = std::time::Duration::from_nanos(start_time
318+
left_time = start_time
308319
.saturating_add($crate::syscall::common::recv_time_limit($fd))
309-
.saturating_sub($crate::common::now()))
320+
.saturating_sub($crate::common::now());
321+
let wait_time = std::time::Duration::from_nanos(left_time)
310322
.min($crate::common::constants::SLICE);
311323
if $crate::net::EventLoops::wait_read_event(
312324
$fd,
@@ -363,9 +375,12 @@ macro_rules! impl_nio_write_buf {
363375
$crate::syscall::common::set_non_blocking($fd);
364376
}
365377
let start_time = $crate::common::now();
378+
let mut left_time = start_time
379+
.saturating_add($crate::syscall::common::send_time_limit($fd))
380+
.saturating_sub(start_time);
366381
let mut sent = 0;
367382
let mut r = 0;
368-
while sent < $len {
383+
while sent < $len && left_time > 0 {
369384
r = self.inner.$syscall(
370385
fn_ptr,
371386
$fd,
@@ -384,9 +399,10 @@ macro_rules! impl_nio_write_buf {
384399
let error_kind = std::io::Error::last_os_error().kind();
385400
if error_kind == std::io::ErrorKind::WouldBlock {
386401
//wait write event
387-
let wait_time = std::time::Duration::from_nanos(start_time
402+
left_time = start_time
388403
.saturating_add($crate::syscall::common::send_time_limit($fd))
389-
.saturating_sub($crate::common::now()))
404+
.saturating_sub($crate::common::now());
405+
let wait_time = std::time::Duration::from_nanos(left_time)
390406
.min($crate::common::constants::SLICE);
391407
if $crate::net::EventLoops::wait_write_event(
392408
$fd,
@@ -431,8 +447,11 @@ macro_rules! impl_nio_write_iovec {
431447
if blocking {
432448
$crate::syscall::common::set_non_blocking($fd);
433449
}
434-
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
435450
let start_time = $crate::common::now();
451+
let mut left_time = start_time
452+
.saturating_add($crate::syscall::common::send_time_limit($fd))
453+
.saturating_sub(start_time);
454+
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
436455
let mut length = 0;
437456
let mut sent = 0usize;
438457
let mut r = 0;
@@ -448,7 +467,7 @@ macro_rules! impl_nio_write_iovec {
448467
for i in vec.iter().skip(index) {
449468
arg.push(*i);
450469
}
451-
while sent < length {
470+
while sent < length && left_time > 0 {
452471
if 0 != offset {
453472
arg[0] = libc::iovec {
454473
iov_base: (arg[0].iov_base as usize + offset) as *mut std::ffi::c_void,
@@ -476,9 +495,10 @@ macro_rules! impl_nio_write_iovec {
476495
let error_kind = std::io::Error::last_os_error().kind();
477496
if error_kind == std::io::ErrorKind::WouldBlock {
478497
//wait write event
479-
let wait_time = std::time::Duration::from_nanos(start_time
498+
left_time = start_time
480499
.saturating_add($crate::syscall::common::send_time_limit($fd))
481-
.saturating_sub($crate::common::now()))
500+
.saturating_sub($crate::common::now());
501+
let wait_time = std::time::Duration::from_nanos(left_time)
482502
.min($crate::common::constants::SLICE);
483503
if $crate::net::EventLoops::wait_write_event(
484504
$fd,
@@ -670,16 +690,21 @@ pub extern "C" fn send_time_limit(fd: c_int) -> u64 {
670690
|| unsafe {
671691
let mut tv: libc::timeval = std::mem::zeroed();
672692
let mut len = size_of::<libc::timeval>() as libc::socklen_t;
673-
assert_eq!(
674-
0,
675-
libc::getsockopt(
676-
fd,
677-
libc::SOL_SOCKET,
678-
libc::SO_SNDTIMEO,
679-
std::ptr::from_mut(&mut tv).cast(),
680-
&mut len,
681-
)
682-
);
693+
if libc::getsockopt(
694+
fd,
695+
libc::SOL_SOCKET,
696+
libc::SO_SNDTIMEO,
697+
std::ptr::from_mut(&mut tv).cast(),
698+
&mut len,
699+
) == -1
700+
{
701+
let error = std::io::Error::last_os_error();
702+
if Some(libc::ENOTSOCK) == error.raw_os_error() {
703+
// not a socket
704+
return u64::MAX;
705+
}
706+
panic!("getsockopt failed: {}", error);
707+
}
683708
let mut time_limit = (tv.tv_sec as u64)
684709
.saturating_mul(1_000_000_000)
685710
.saturating_add((tv.tv_usec as u64).saturating_mul(1_000));
@@ -700,16 +725,21 @@ pub extern "C" fn recv_time_limit(fd: c_int) -> u64 {
700725
|| unsafe {
701726
let mut tv: libc::timeval = std::mem::zeroed();
702727
let mut len = size_of::<libc::timeval>() as libc::socklen_t;
703-
assert_eq!(
704-
0,
705-
libc::getsockopt(
706-
fd,
707-
libc::SOL_SOCKET,
708-
libc::SO_RCVTIMEO,
709-
std::ptr::from_mut(&mut tv).cast(),
710-
&mut len,
711-
)
712-
);
728+
if libc::getsockopt(
729+
fd,
730+
libc::SOL_SOCKET,
731+
libc::SO_RCVTIMEO,
732+
std::ptr::from_mut(&mut tv).cast(),
733+
&mut len,
734+
) == -1
735+
{
736+
let error = std::io::Error::last_os_error();
737+
if Some(libc::ENOTSOCK) == error.raw_os_error() {
738+
// not a socket
739+
return u64::MAX;
740+
}
741+
panic!("getsockopt failed: {}", error);
742+
}
713743
let mut time_limit = (tv.tv_sec as u64)
714744
.saturating_mul(1_000_000_000)
715745
.saturating_add((tv.tv_usec as u64).saturating_mul(1_000));

core/src/syscall/unix/recvfrom.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,34 @@ trait RecvfromSyscall {
4242
}
4343

4444
impl_facade!(RecvfromSyscallFacade, RecvfromSyscall,
45-
recvfrom(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int,
46-
addr: *mut sockaddr, addrlen: *mut socklen_t) -> ssize_t
45+
recvfrom(
46+
fd: c_int,
47+
buf: *mut c_void,
48+
len: size_t,
49+
flags: c_int,
50+
addr: *mut sockaddr,
51+
addrlen: *mut socklen_t
52+
) -> ssize_t
4753
);
4854

4955
impl_nio_read_buf!(NioRecvfromSyscall, RecvfromSyscall,
50-
recvfrom(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int,
51-
addr: *mut sockaddr, addrlen: *mut socklen_t) -> ssize_t
56+
recvfrom(
57+
fd: c_int,
58+
buf: *mut c_void,
59+
len: size_t,
60+
flags: c_int,
61+
addr: *mut sockaddr,
62+
addrlen: *mut socklen_t
63+
) -> ssize_t
5264
);
5365

5466
impl_raw!(RawRecvfromSyscall, RecvfromSyscall,
55-
recvfrom(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int,
56-
addr: *mut sockaddr, addrlen: *mut socklen_t) -> ssize_t
67+
recvfrom(
68+
fd: c_int,
69+
buf: *mut c_void,
70+
len: size_t,
71+
flags: c_int,
72+
addr: *mut sockaddr,
73+
addrlen: *mut socklen_t
74+
) -> ssize_t
5775
);

core/src/syscall/unix/recvmsg.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use crate::common::now;
12
use crate::net::EventLoops;
2-
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_non_blocking};
3+
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_non_blocking, recv_time_limit};
34
use libc::{msghdr, ssize_t};
45
use once_cell::sync::Lazy;
56
use std::ffi::{c_int, c_void};
@@ -62,6 +63,10 @@ impl<I: RecvmsgSyscall> RecvmsgSyscall for NioRecvmsgSyscall<I> {
6263
if blocking {
6364
set_non_blocking(fd);
6465
}
66+
let start_time = now();
67+
let mut left_time = start_time
68+
.saturating_add(recv_time_limit(fd))
69+
.saturating_sub(start_time);
6570
let msghdr = unsafe { *msg };
6671
let vec = unsafe {
6772
Vec::from_raw_parts(
@@ -99,7 +104,7 @@ impl<I: RecvmsgSyscall> RecvmsgSyscall for NioRecvmsgSyscall<I> {
99104
});
100105
}
101106
}
102-
while received < length {
107+
while received < length && left_time > 0 {
103108
if 0 != offset {
104109
iov[0] = libc::iovec {
105110
iov_base: (iov[0].iov_base as usize + offset) as *mut c_void,
@@ -134,9 +139,12 @@ impl<I: RecvmsgSyscall> RecvmsgSyscall for NioRecvmsgSyscall<I> {
134139
let error_kind = Error::last_os_error().kind();
135140
if error_kind == ErrorKind::WouldBlock {
136141
//wait read event
137-
if EventLoops::wait_read_event(fd, Some(crate::common::constants::SLICE))
138-
.is_err()
139-
{
142+
left_time = start_time
143+
.saturating_add(recv_time_limit(fd))
144+
.saturating_sub(now());
145+
let wait_time = std::time::Duration::from_nanos(left_time)
146+
.min(crate::common::constants::SLICE);
147+
if EventLoops::wait_read_event(fd, Some(wait_time)).is_err() {
140148
std::mem::forget(vec);
141149
if blocking {
142150
set_blocking(fd);

core/src/syscall/unix/sendmsg.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use crate::common::now;
12
use crate::net::EventLoops;
2-
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_non_blocking};
3+
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_non_blocking, send_time_limit};
34
use libc::{msghdr, ssize_t};
45
use once_cell::sync::Lazy;
56
use std::ffi::{c_int, c_void};
@@ -50,6 +51,7 @@ struct NioSendmsgSyscall<I: SendmsgSyscall> {
5051
}
5152

5253
impl<I: SendmsgSyscall> SendmsgSyscall for NioSendmsgSyscall<I> {
54+
#[allow(clippy::too_many_lines)]
5355
extern "C" fn sendmsg(
5456
&self,
5557
fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>,
@@ -61,6 +63,10 @@ impl<I: SendmsgSyscall> SendmsgSyscall for NioSendmsgSyscall<I> {
6163
if blocking {
6264
set_non_blocking(fd);
6365
}
66+
let start_time = now();
67+
let mut left_time = start_time
68+
.saturating_add(send_time_limit(fd))
69+
.saturating_sub(start_time);
6470
let msghdr = unsafe { *msg };
6571
let vec = unsafe {
6672
Vec::from_raw_parts(
@@ -98,7 +104,7 @@ impl<I: SendmsgSyscall> SendmsgSyscall for NioSendmsgSyscall<I> {
98104
});
99105
}
100106
}
101-
while sent < length {
107+
while sent < length && left_time > 0 {
102108
if 0 != offset {
103109
iov[0] = libc::iovec {
104110
iov_base: (iov[0].iov_base as usize + offset) as *mut c_void,
@@ -127,9 +133,12 @@ impl<I: SendmsgSyscall> SendmsgSyscall for NioSendmsgSyscall<I> {
127133
let error_kind = Error::last_os_error().kind();
128134
if error_kind == ErrorKind::WouldBlock {
129135
//wait write event
130-
if EventLoops::wait_write_event(fd, Some(crate::common::constants::SLICE))
131-
.is_err()
132-
{
136+
left_time = start_time
137+
.saturating_add(send_time_limit(fd))
138+
.saturating_sub(now());
139+
let wait_time = std::time::Duration::from_nanos(left_time)
140+
.min(crate::common::constants::SLICE);
141+
if EventLoops::wait_write_event(fd, Some(wait_time)).is_err() {
133142
std::mem::forget(vec);
134143
if blocking {
135144
set_blocking(fd);

0 commit comments

Comments
 (0)