Skip to content

Commit 03d0223

Browse files
authored
hook connect (#325)
2 parents 5ee597c + cc14ac1 commit 03d0223

File tree

8 files changed

+146
-74
lines changed

8 files changed

+146
-74
lines changed

core/src/syscall/unix/connect.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use crate::common::now;
12
use crate::net::EventLoops;
2-
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_errno, set_non_blocking};
3+
use crate::syscall::common::{is_blocking, reset_errno, send_time_limit, set_blocking, set_errno, set_non_blocking};
34
use libc::{sockaddr, socklen_t};
45
use once_cell::sync::Lazy;
56
use std::ffi::{c_int, c_void};
@@ -61,22 +62,26 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
6162
if blocking {
6263
set_non_blocking(fd);
6364
}
65+
let start_time = now();
6466
let mut r = self.inner.connect(fn_ptr, fd, address, len);
65-
loop {
67+
while start_time
68+
.saturating_add(send_time_limit(fd))
69+
.saturating_sub(now()) > 0
70+
{
6671
if r == 0 {
6772
reset_errno();
6873
break;
6974
}
7075
let errno = Error::last_os_error().raw_os_error();
71-
if errno == Some(libc::EINPROGRESS) || errno == Some(libc::EALREADY) {
76+
if errno == Some(libc::EINPROGRESS) || errno == Some(libc::EALREADY) || errno == Some(libc::EWOULDBLOCK) {
7277
//阻塞,直到写事件发生
7378
if EventLoops::wait_write_event(fd, Some(crate::common::constants::SLICE)).is_err()
7479
{
7580
break;
7681
}
7782
let mut err = 0;
7883
unsafe {
79-
let mut len: socklen_t = std::mem::zeroed();
84+
let mut len = socklen_t::try_from(size_of_val(&err)).expect("overflow");
8085
r = libc::getsockopt(
8186
fd,
8287
libc::SOL_SOCKET,
@@ -96,7 +101,7 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
96101
};
97102
unsafe {
98103
let mut address = std::mem::zeroed();
99-
let mut address_len = std::mem::zeroed();
104+
let mut address_len = socklen_t::try_from(size_of_val(&address)).expect("overflow");
100105
r = libc::getpeername(fd, &mut address, &mut address_len);
101106
}
102107
} else if errno != Some(libc::EINTR) {

core/src/syscall/unix/mod.rs

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,9 @@ macro_rules! impl_nio_read {
131131
$crate::syscall::common::set_non_blocking($fd);
132132
}
133133
let start_time = $crate::common::now();
134-
let mut r;
135-
loop {
134+
let mut left_time = $crate::syscall::common::recv_time_limit($fd);
135+
let mut r = -1;
136+
while left_time > 0 {
136137
r = self.inner.$syscall(fn_ptr, $fd, $($arg, )*);
137138
if r != -1 {
138139
$crate::syscall::common::reset_errno();
@@ -141,9 +142,10 @@ macro_rules! impl_nio_read {
141142
let error_kind = std::io::Error::last_os_error().kind();
142143
if error_kind == std::io::ErrorKind::WouldBlock {
143144
//wait read event
144-
let wait_time = std::time::Duration::from_nanos(start_time
145+
left_time = start_time
145146
.saturating_add($crate::syscall::common::recv_time_limit($fd))
146-
.saturating_sub($crate::common::now()))
147+
.saturating_sub($crate::common::now());
148+
let wait_time = std::time::Duration::from_nanos(left_time)
147149
.min($crate::common::constants::SLICE);
148150
if $crate::net::EventLoops::wait_read_event(
149151
$fd,
@@ -187,9 +189,10 @@ macro_rules! impl_nio_read_buf {
187189
$crate::syscall::common::set_non_blocking($fd);
188190
}
189191
let start_time = $crate::common::now();
192+
let mut left_time = $crate::syscall::common::recv_time_limit($fd);
190193
let mut received = 0;
191194
let mut r = 0;
192-
while received < $len {
195+
while received < $len && left_time > 0 {
193196
r = self.inner.$syscall(
194197
fn_ptr,
195198
$fd,
@@ -208,9 +211,10 @@ macro_rules! impl_nio_read_buf {
208211
let error_kind = std::io::Error::last_os_error().kind();
209212
if error_kind == std::io::ErrorKind::WouldBlock {
210213
//wait read event
211-
let wait_time = std::time::Duration::from_nanos(start_time
214+
left_time = start_time
212215
.saturating_add($crate::syscall::common::recv_time_limit($fd))
213-
.saturating_sub($crate::common::now()))
216+
.saturating_sub($crate::common::now());
217+
let wait_time = std::time::Duration::from_nanos(left_time)
214218
.min($crate::common::constants::SLICE);
215219
if $crate::net::EventLoops::wait_read_event(
216220
$fd,
@@ -253,8 +257,9 @@ macro_rules! impl_nio_read_iovec {
253257
if blocking {
254258
$crate::syscall::common::set_non_blocking($fd);
255259
}
256-
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
257260
let start_time = $crate::common::now();
261+
let mut left_time = $crate::syscall::common::recv_time_limit($fd);
262+
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
258263
let mut length = 0;
259264
let mut received = 0usize;
260265
let mut r = 0;
@@ -270,7 +275,7 @@ macro_rules! impl_nio_read_iovec {
270275
for i in vec.iter().skip(index) {
271276
arg.push(*i);
272277
}
273-
while received < length {
278+
while received < length && left_time > 0 {
274279
if 0 != offset {
275280
arg[0] = libc::iovec {
276281
iov_base: (arg[0].iov_base as usize + offset) as *mut std::ffi::c_void,
@@ -304,9 +309,10 @@ macro_rules! impl_nio_read_iovec {
304309
let error_kind = std::io::Error::last_os_error().kind();
305310
if error_kind == std::io::ErrorKind::WouldBlock {
306311
//wait read event
307-
let wait_time = std::time::Duration::from_nanos(start_time
312+
left_time = start_time
308313
.saturating_add($crate::syscall::common::recv_time_limit($fd))
309-
.saturating_sub($crate::common::now()))
314+
.saturating_sub($crate::common::now());
315+
let wait_time = std::time::Duration::from_nanos(left_time)
310316
.min($crate::common::constants::SLICE);
311317
if $crate::net::EventLoops::wait_read_event(
312318
$fd,
@@ -363,9 +369,10 @@ macro_rules! impl_nio_write_buf {
363369
$crate::syscall::common::set_non_blocking($fd);
364370
}
365371
let start_time = $crate::common::now();
372+
let mut left_time = $crate::syscall::common::send_time_limit($fd);
366373
let mut sent = 0;
367374
let mut r = 0;
368-
while sent < $len {
375+
while sent < $len && left_time > 0 {
369376
r = self.inner.$syscall(
370377
fn_ptr,
371378
$fd,
@@ -384,9 +391,10 @@ macro_rules! impl_nio_write_buf {
384391
let error_kind = std::io::Error::last_os_error().kind();
385392
if error_kind == std::io::ErrorKind::WouldBlock {
386393
//wait write event
387-
let wait_time = std::time::Duration::from_nanos(start_time
394+
left_time = start_time
388395
.saturating_add($crate::syscall::common::send_time_limit($fd))
389-
.saturating_sub($crate::common::now()))
396+
.saturating_sub($crate::common::now());
397+
let wait_time = std::time::Duration::from_nanos(left_time)
390398
.min($crate::common::constants::SLICE);
391399
if $crate::net::EventLoops::wait_write_event(
392400
$fd,
@@ -431,8 +439,9 @@ macro_rules! impl_nio_write_iovec {
431439
if blocking {
432440
$crate::syscall::common::set_non_blocking($fd);
433441
}
434-
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
435442
let start_time = $crate::common::now();
443+
let mut left_time = $crate::syscall::common::send_time_limit($fd);
444+
let vec = unsafe { Vec::from_raw_parts($iov.cast_mut(), $iovcnt as usize, $iovcnt as usize) };
436445
let mut length = 0;
437446
let mut sent = 0usize;
438447
let mut r = 0;
@@ -448,7 +457,7 @@ macro_rules! impl_nio_write_iovec {
448457
for i in vec.iter().skip(index) {
449458
arg.push(*i);
450459
}
451-
while sent < length {
460+
while sent < length && left_time > 0 {
452461
if 0 != offset {
453462
arg[0] = libc::iovec {
454463
iov_base: (arg[0].iov_base as usize + offset) as *mut std::ffi::c_void,
@@ -476,9 +485,10 @@ macro_rules! impl_nio_write_iovec {
476485
let error_kind = std::io::Error::last_os_error().kind();
477486
if error_kind == std::io::ErrorKind::WouldBlock {
478487
//wait write event
479-
let wait_time = std::time::Duration::from_nanos(start_time
488+
left_time = start_time
480489
.saturating_add($crate::syscall::common::send_time_limit($fd))
481-
.saturating_sub($crate::common::now()))
490+
.saturating_sub($crate::common::now());
491+
let wait_time = std::time::Duration::from_nanos(left_time)
482492
.min($crate::common::constants::SLICE);
483493
if $crate::net::EventLoops::wait_write_event(
484494
$fd,
@@ -670,16 +680,21 @@ pub extern "C" fn send_time_limit(fd: c_int) -> u64 {
670680
|| unsafe {
671681
let mut tv: libc::timeval = std::mem::zeroed();
672682
let mut len = size_of::<libc::timeval>() as libc::socklen_t;
673-
assert_eq!(
674-
0,
675-
libc::getsockopt(
676-
fd,
677-
libc::SOL_SOCKET,
678-
libc::SO_SNDTIMEO,
679-
std::ptr::from_mut(&mut tv).cast(),
680-
&mut len,
681-
)
682-
);
683+
if libc::getsockopt(
684+
fd,
685+
libc::SOL_SOCKET,
686+
libc::SO_SNDTIMEO,
687+
std::ptr::from_mut(&mut tv).cast(),
688+
&mut len,
689+
) == -1
690+
{
691+
let error = std::io::Error::last_os_error();
692+
if Some(libc::ENOTSOCK) == error.raw_os_error() {
693+
// not a socket
694+
return u64::MAX;
695+
}
696+
panic!("getsockopt failed: {error}");
697+
}
683698
let mut time_limit = (tv.tv_sec as u64)
684699
.saturating_mul(1_000_000_000)
685700
.saturating_add((tv.tv_usec as u64).saturating_mul(1_000));
@@ -700,16 +715,21 @@ pub extern "C" fn recv_time_limit(fd: c_int) -> u64 {
700715
|| unsafe {
701716
let mut tv: libc::timeval = std::mem::zeroed();
702717
let mut len = size_of::<libc::timeval>() as libc::socklen_t;
703-
assert_eq!(
704-
0,
705-
libc::getsockopt(
706-
fd,
707-
libc::SOL_SOCKET,
708-
libc::SO_RCVTIMEO,
709-
std::ptr::from_mut(&mut tv).cast(),
710-
&mut len,
711-
)
712-
);
718+
if libc::getsockopt(
719+
fd,
720+
libc::SOL_SOCKET,
721+
libc::SO_RCVTIMEO,
722+
std::ptr::from_mut(&mut tv).cast(),
723+
&mut len,
724+
) == -1
725+
{
726+
let error = std::io::Error::last_os_error();
727+
if Some(libc::ENOTSOCK) == error.raw_os_error() {
728+
// not a socket
729+
return u64::MAX;
730+
}
731+
panic!("getsockopt failed: {error}");
732+
}
713733
let mut time_limit = (tv.tv_sec as u64)
714734
.saturating_mul(1_000_000_000)
715735
.saturating_add((tv.tv_usec as u64).saturating_mul(1_000));

core/src/syscall/unix/recvfrom.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,34 @@ trait RecvfromSyscall {
4242
}
4343

4444
impl_facade!(RecvfromSyscallFacade, RecvfromSyscall,
45-
recvfrom(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int,
46-
addr: *mut sockaddr, addrlen: *mut socklen_t) -> ssize_t
45+
recvfrom(
46+
fd: c_int,
47+
buf: *mut c_void,
48+
len: size_t,
49+
flags: c_int,
50+
addr: *mut sockaddr,
51+
addrlen: *mut socklen_t
52+
) -> ssize_t
4753
);
4854

4955
impl_nio_read_buf!(NioRecvfromSyscall, RecvfromSyscall,
50-
recvfrom(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int,
51-
addr: *mut sockaddr, addrlen: *mut socklen_t) -> ssize_t
56+
recvfrom(
57+
fd: c_int,
58+
buf: *mut c_void,
59+
len: size_t,
60+
flags: c_int,
61+
addr: *mut sockaddr,
62+
addrlen: *mut socklen_t
63+
) -> ssize_t
5264
);
5365

5466
impl_raw!(RawRecvfromSyscall, RecvfromSyscall,
55-
recvfrom(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int,
56-
addr: *mut sockaddr, addrlen: *mut socklen_t) -> ssize_t
67+
recvfrom(
68+
fd: c_int,
69+
buf: *mut c_void,
70+
len: size_t,
71+
flags: c_int,
72+
addr: *mut sockaddr,
73+
addrlen: *mut socklen_t
74+
) -> ssize_t
5775
);

core/src/syscall/unix/recvmsg.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use crate::common::now;
12
use crate::net::EventLoops;
2-
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_non_blocking};
3+
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_non_blocking, recv_time_limit};
34
use libc::{msghdr, ssize_t};
45
use once_cell::sync::Lazy;
56
use std::ffi::{c_int, c_void};
@@ -62,6 +63,8 @@ impl<I: RecvmsgSyscall> RecvmsgSyscall for NioRecvmsgSyscall<I> {
6263
if blocking {
6364
set_non_blocking(fd);
6465
}
66+
let start_time = now();
67+
let mut left_time = recv_time_limit(fd);
6568
let msghdr = unsafe { *msg };
6669
let vec = unsafe {
6770
Vec::from_raw_parts(
@@ -99,7 +102,7 @@ impl<I: RecvmsgSyscall> RecvmsgSyscall for NioRecvmsgSyscall<I> {
99102
});
100103
}
101104
}
102-
while received < length {
105+
while received < length && left_time > 0 {
103106
if 0 != offset {
104107
iov[0] = libc::iovec {
105108
iov_base: (iov[0].iov_base as usize + offset) as *mut c_void,
@@ -134,9 +137,12 @@ impl<I: RecvmsgSyscall> RecvmsgSyscall for NioRecvmsgSyscall<I> {
134137
let error_kind = Error::last_os_error().kind();
135138
if error_kind == ErrorKind::WouldBlock {
136139
//wait read event
137-
if EventLoops::wait_read_event(fd, Some(crate::common::constants::SLICE))
138-
.is_err()
139-
{
140+
left_time = start_time
141+
.saturating_add(recv_time_limit(fd))
142+
.saturating_sub(now());
143+
let wait_time = std::time::Duration::from_nanos(left_time)
144+
.min(crate::common::constants::SLICE);
145+
if EventLoops::wait_read_event(fd, Some(wait_time)).is_err() {
140146
std::mem::forget(vec);
141147
if blocking {
142148
set_blocking(fd);

core/src/syscall/unix/sendmsg.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use crate::common::now;
12
use crate::net::EventLoops;
2-
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_non_blocking};
3+
use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_non_blocking, send_time_limit};
34
use libc::{msghdr, ssize_t};
45
use once_cell::sync::Lazy;
56
use std::ffi::{c_int, c_void};
@@ -50,6 +51,7 @@ struct NioSendmsgSyscall<I: SendmsgSyscall> {
5051
}
5152

5253
impl<I: SendmsgSyscall> SendmsgSyscall for NioSendmsgSyscall<I> {
54+
#[allow(clippy::too_many_lines)]
5355
extern "C" fn sendmsg(
5456
&self,
5557
fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>,
@@ -61,6 +63,8 @@ impl<I: SendmsgSyscall> SendmsgSyscall for NioSendmsgSyscall<I> {
6163
if blocking {
6264
set_non_blocking(fd);
6365
}
66+
let start_time = now();
67+
let mut left_time = send_time_limit(fd);
6468
let msghdr = unsafe { *msg };
6569
let vec = unsafe {
6670
Vec::from_raw_parts(
@@ -98,7 +102,7 @@ impl<I: SendmsgSyscall> SendmsgSyscall for NioSendmsgSyscall<I> {
98102
});
99103
}
100104
}
101-
while sent < length {
105+
while sent < length && left_time > 0 {
102106
if 0 != offset {
103107
iov[0] = libc::iovec {
104108
iov_base: (iov[0].iov_base as usize + offset) as *mut c_void,
@@ -127,9 +131,12 @@ impl<I: SendmsgSyscall> SendmsgSyscall for NioSendmsgSyscall<I> {
127131
let error_kind = Error::last_os_error().kind();
128132
if error_kind == ErrorKind::WouldBlock {
129133
//wait write event
130-
if EventLoops::wait_write_event(fd, Some(crate::common::constants::SLICE))
131-
.is_err()
132-
{
134+
left_time = start_time
135+
.saturating_add(send_time_limit(fd))
136+
.saturating_sub(now());
137+
let wait_time = std::time::Duration::from_nanos(left_time)
138+
.min(crate::common::constants::SLICE);
139+
if EventLoops::wait_write_event(fd, Some(wait_time)).is_err() {
133140
std::mem::forget(vec);
134141
if blocking {
135142
set_blocking(fd);

0 commit comments

Comments
 (0)