Skip to content

Commit 8584ae7

Browse files
authored
refactor preadv/pwritev syscall (#304)
2 parents 2665b77 + ff75352 commit 8584ae7

File tree

10 files changed

+118
-290
lines changed

10 files changed

+118
-290
lines changed

open-coroutine-core/src/net/event_loop/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ static EVENT_LOOP_WORKERS: OnceCell<Box<[std::thread::JoinHandle<()>]>> = OnceCe
106106

107107
static EVENT_LOOP_STARTED: Lazy<AtomicBool> = Lazy::new(AtomicBool::default);
108108

109+
static EVENT_LOOP_START_COUNT: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
110+
109111
static EVENT_LOOP_STOP: Lazy<Arc<(Mutex<AtomicUsize>, Condvar)>> =
110112
Lazy::new(|| Arc::new((Mutex::new(AtomicUsize::new(0)), Condvar::new())));
111113

@@ -151,6 +153,7 @@ impl EventLoops {
151153
.name(format!("open-coroutine-event-loop-{i}"))
152154
.spawn(move || {
153155
warn!("open-coroutine-event-loop-{i} has started");
156+
_ = EVENT_LOOP_START_COUNT.fetch_add(1, Ordering::Release);
154157
if set_for_current(CoreId { id: i }) {
155158
warn!("pin event-loop-{i} thread to CPU core-{i} failed !");
156159
}
@@ -185,7 +188,10 @@ impl EventLoops {
185188
.wait_timeout_while(
186189
lock.lock().unwrap(),
187190
Duration::from_millis(30000),
188-
|stopped| stopped.load(Ordering::Acquire) < unsafe { EVENT_LOOPS.len() } - 1,
191+
|stopped| {
192+
stopped.load(Ordering::Acquire)
193+
< EVENT_LOOP_START_COUNT.load(Ordering::Acquire) - 1
194+
},
189195
)
190196
.unwrap()
191197
.1;

open-coroutine-core/src/syscall/facade.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::syscall::LinuxSyscall;
66
use crate::syscall::UnixSyscall;
77
#[cfg(target_os = "linux")]
88
use libc::epoll_event;
9-
use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
9+
use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
1010
use once_cell::sync::Lazy;
1111
use std::ffi::{c_int, c_void};
1212

@@ -44,17 +44,6 @@ pub extern "C" fn pread(
4444
CHAIN.pread(fn_ptr, fd, buf, count, offset)
4545
}
4646

47-
#[must_use]
48-
pub extern "C" fn preadv(
49-
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
50-
fd: c_int,
51-
iov: *const iovec,
52-
iovcnt: c_int,
53-
offset: off_t,
54-
) -> ssize_t {
55-
CHAIN.preadv(fn_ptr, fd, iov, iovcnt, offset)
56-
}
57-
5847
#[must_use]
5948
pub extern "C" fn recvmsg(
6049
fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>,
@@ -103,17 +92,6 @@ pub extern "C" fn pwrite(
10392
CHAIN.pwrite(fn_ptr, fd, buf, count, offset)
10493
}
10594

106-
#[must_use]
107-
pub extern "C" fn pwritev(
108-
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
109-
fd: c_int,
110-
iov: *const iovec,
111-
iovcnt: c_int,
112-
offset: off_t,
113-
) -> ssize_t {
114-
CHAIN.pwritev(fn_ptr, fd, iov, iovcnt, offset)
115-
}
116-
11795
#[must_use]
11896
pub extern "C" fn sendmsg(
11997
fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>,

open-coroutine-core/src/syscall/io_uring.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::syscall::LinuxSyscall;
22
use crate::syscall::UnixSyscall;
33
use libc::epoll_event;
4-
use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
4+
use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
55
use std::ffi::{c_int, c_void};
66

77
#[derive(Debug, Default)]
@@ -46,17 +46,6 @@ impl<I: UnixSyscall> UnixSyscall for IoUringLinuxSyscall<I> {
4646
impl_io_uring!(self, pread, fn_ptr, fd, buf, count, offset)
4747
}
4848

49-
extern "C" fn preadv(
50-
&self,
51-
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
52-
fd: c_int,
53-
iov: *const iovec,
54-
iovcnt: c_int,
55-
offset: off_t,
56-
) -> ssize_t {
57-
impl_io_uring!(self, preadv, fn_ptr, fd, iov, iovcnt, offset)
58-
}
59-
6049
extern "C" fn recvmsg(
6150
&self,
6251
fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>,
@@ -110,17 +99,6 @@ impl<I: UnixSyscall> UnixSyscall for IoUringLinuxSyscall<I> {
11099
impl_io_uring!(self, pwrite, fn_ptr, fd, buf, count, offset)
111100
}
112101

113-
extern "C" fn pwritev(
114-
&self,
115-
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
116-
fd: c_int,
117-
iov: *const iovec,
118-
iovcnt: c_int,
119-
offset: off_t,
120-
) -> ssize_t {
121-
impl_io_uring!(self, pwritev, fn_ptr, fd, iov, iovcnt, offset)
122-
}
123-
124102
extern "C" fn sendmsg(
125103
&self,
126104
fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>,

open-coroutine-core/src/syscall/mod.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#[cfg(target_os = "linux")]
22
use libc::epoll_event;
33
#[cfg(unix)]
4-
use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
4+
use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t};
55
#[cfg(unix)]
66
use std::ffi::{c_int, c_void};
77

@@ -47,15 +47,6 @@ pub trait UnixSyscall {
4747
offset: off_t,
4848
) -> ssize_t;
4949

50-
extern "C" fn preadv(
51-
&self,
52-
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
53-
fd: c_int,
54-
iov: *const iovec,
55-
iovcnt: c_int,
56-
offset: off_t,
57-
) -> ssize_t;
58-
5950
extern "C" fn recvmsg(
6051
&self,
6152
fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>,
@@ -103,15 +94,6 @@ pub trait UnixSyscall {
10394
offset: off_t,
10495
) -> ssize_t;
10596

106-
extern "C" fn pwritev(
107-
&self,
108-
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
109-
fd: c_int,
110-
iov: *const iovec,
111-
iovcnt: c_int,
112-
offset: off_t,
113-
) -> ssize_t;
114-
11597
extern "C" fn sendmsg(
11698
&self,
11799
fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>,

open-coroutine-core/src/syscall/nio.rs

Lines changed: 0 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -61,80 +61,6 @@ macro_rules! impl_expected_read_hook {
6161
}};
6262
}
6363

64-
macro_rules! impl_expected_batch_read_hook {
65-
( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $iov:expr, $length:expr, $($arg: expr),* $(,)* ) => {{
66-
let socket = $socket;
67-
let blocking = $crate::syscall::common::is_blocking(socket);
68-
if blocking {
69-
$crate::syscall::common::set_non_blocking(socket);
70-
}
71-
let mut vec = std::collections::VecDeque::from(unsafe {
72-
Vec::from_raw_parts($iov.cast_mut(), $length as usize, $length as usize)
73-
});
74-
let mut length = 0;
75-
let mut pices = std::collections::VecDeque::new();
76-
for iovec in &vec {
77-
length += iovec.iov_len;
78-
pices.push_back(length);
79-
}
80-
let mut received = 0;
81-
let mut r = 0;
82-
while received < length {
83-
// find from-index
84-
let mut from_index = 0;
85-
for (i, v) in pices.iter().enumerate() {
86-
if received < *v {
87-
from_index = i;
88-
break;
89-
}
90-
}
91-
// calculate offset
92-
let current_received_offset = if from_index > 0 {
93-
received.saturating_sub(pices[from_index.saturating_sub(1)])
94-
} else {
95-
received
96-
};
97-
// remove already received
98-
for _ in 0..from_index {
99-
_ = vec.pop_front();
100-
_ = pices.pop_front();
101-
}
102-
// build syscall args
103-
vec[0] = iovec {
104-
iov_base: (vec[0].iov_base as usize + current_received_offset) as *mut c_void,
105-
iov_len: vec[0].iov_len - current_received_offset,
106-
};
107-
r = $invoker.$syscall($fn_ptr, $socket, vec.get(0).unwrap(), c_int::try_from(vec.len()).unwrap(), $($arg, )*);
108-
if r != -1 {
109-
$crate::syscall::common::reset_errno();
110-
received += r as usize;
111-
if received >= length || r == 0 {
112-
r = received as ssize_t;
113-
break;
114-
}
115-
}
116-
let error_kind = std::io::Error::last_os_error().kind();
117-
if error_kind == std::io::ErrorKind::WouldBlock {
118-
//wait read event
119-
if $crate::net::event_loop::EventLoops::wait_read_event(
120-
socket,
121-
Some(std::time::Duration::from_millis(10)),
122-
)
123-
.is_err()
124-
{
125-
break;
126-
}
127-
} else if error_kind != std::io::ErrorKind::Interrupted {
128-
break;
129-
}
130-
}
131-
if blocking {
132-
$crate::syscall::common::set_blocking(socket);
133-
}
134-
r
135-
}};
136-
}
137-
13864
macro_rules! impl_expected_write_hook {
13965
( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $buffer:expr, $length:expr, $($arg: expr),* $(,)* ) => {{
14066
let socket = $socket;
@@ -182,80 +108,6 @@ macro_rules! impl_expected_write_hook {
182108
}};
183109
}
184110

185-
macro_rules! impl_expected_batch_write_hook {
186-
( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $iov:expr, $length:expr, $($arg: expr),* $(,)* ) => {{
187-
let socket = $socket;
188-
let blocking = $crate::syscall::common::is_blocking(socket);
189-
if blocking {
190-
$crate::syscall::common::set_non_blocking(socket);
191-
}
192-
let mut vec = std::collections::VecDeque::from(unsafe {
193-
Vec::from_raw_parts($iov.cast_mut(), $length as usize, $length as usize)
194-
});
195-
let mut length = 0;
196-
let mut pices = std::collections::VecDeque::new();
197-
for iovec in &vec {
198-
length += iovec.iov_len;
199-
pices.push_back(length);
200-
}
201-
let mut sent = 0;
202-
let mut r = 0;
203-
while sent < length {
204-
// find from-index
205-
let mut from_index = 0;
206-
for (i, v) in pices.iter().enumerate() {
207-
if sent < *v {
208-
from_index = i;
209-
break;
210-
}
211-
}
212-
// calculate offset
213-
let current_sent_offset = if from_index > 0 {
214-
sent.saturating_sub(pices[from_index.saturating_sub(1)])
215-
} else {
216-
sent
217-
};
218-
// remove already sent
219-
for _ in 0..from_index {
220-
_ = vec.pop_front();
221-
_ = pices.pop_front();
222-
}
223-
// build syscall args
224-
vec[0] = iovec {
225-
iov_base: (vec[0].iov_base as usize + current_sent_offset) as *mut c_void,
226-
iov_len: vec[0].iov_len - current_sent_offset,
227-
};
228-
r = $invoker.$syscall($fn_ptr, $socket, vec.get(0).unwrap(), c_int::try_from(vec.len()).unwrap(), $($arg, )*);
229-
if r != -1 {
230-
$crate::syscall::common::reset_errno();
231-
sent += r as usize;
232-
if sent >= length {
233-
r = sent as ssize_t;
234-
break;
235-
}
236-
}
237-
let error_kind = std::io::Error::last_os_error().kind();
238-
if error_kind == std::io::ErrorKind::WouldBlock {
239-
//wait write event
240-
if $crate::net::event_loop::EventLoops::wait_write_event(
241-
socket,
242-
Some(std::time::Duration::from_millis(10)),
243-
)
244-
.is_err()
245-
{
246-
break;
247-
}
248-
} else if error_kind != std::io::ErrorKind::Interrupted {
249-
break;
250-
}
251-
}
252-
if blocking {
253-
$crate::syscall::common::set_blocking(socket);
254-
}
255-
r
256-
}};
257-
}
258-
259111
impl<I: UnixSyscall> UnixSyscall for NioLinuxSyscall<I> {
260112
extern "C" fn read(
261113
&self,
@@ -278,17 +130,6 @@ impl<I: UnixSyscall> UnixSyscall for NioLinuxSyscall<I> {
278130
impl_expected_read_hook!(self.inner, pread, fn_ptr, fd, buf, count, offset)
279131
}
280132

281-
extern "C" fn preadv(
282-
&self,
283-
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
284-
fd: c_int,
285-
iov: *const iovec,
286-
iovcnt: c_int,
287-
offset: off_t,
288-
) -> ssize_t {
289-
impl_expected_batch_read_hook!(self.inner, preadv, fn_ptr, fd, iov, iovcnt, offset)
290-
}
291-
292133
extern "C" fn recvmsg(
293134
&self,
294135
fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>,
@@ -432,17 +273,6 @@ impl<I: UnixSyscall> UnixSyscall for NioLinuxSyscall<I> {
432273
impl_expected_write_hook!(self.inner, pwrite, fn_ptr, fd, buf, count, offset)
433274
}
434275

435-
extern "C" fn pwritev(
436-
&self,
437-
fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>,
438-
fd: c_int,
439-
iov: *const iovec,
440-
iovcnt: c_int,
441-
offset: off_t,
442-
) -> ssize_t {
443-
impl_expected_batch_write_hook!(self.inner, pwritev, fn_ptr, fd, iov, iovcnt, offset)
444-
}
445-
446276
extern "C" fn sendmsg(
447277
&self,
448278
fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>,

0 commit comments

Comments
 (0)