Skip to content

Commit b742c4c

Browse files
committed
refactor send syscall
1 parent 9a93800 commit b742c4c

File tree

10 files changed

+227
-94
lines changed

10 files changed

+227
-94
lines changed

examples/src/lib.rs

Lines changed: 110 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,127 @@
11
use open_coroutine::task;
2-
use std::io::{BufRead, BufReader, Read, Write};
3-
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
2+
use std::io::{BufRead, BufReader, ErrorKind, IoSlice, IoSliceMut, Read, Write};
3+
use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
44
use std::sync::atomic::{AtomicBool, Ordering};
55
use std::sync::{Arc, Condvar, Mutex};
66
use std::time::Duration;
77

8-
fn crate_co(input: i32) {
8+
fn crate_task(input: i32) {
99
_ = task!(
1010
|_, param| {
11-
println!("[coroutine] launched param:{}", param);
11+
println!("[coroutine{}] launched", param);
1212
},
1313
input,
1414
);
1515
}
1616

17+
pub fn start_server<A: ToSocketAddrs>(
18+
addr: A,
19+
server_finished: Arc<(Mutex<bool>, Condvar)>,
20+
) -> std::io::Result<()> {
21+
let listener = TcpListener::bind(addr)?;
22+
for stream in listener.incoming() {
23+
let mut stream = stream?;
24+
let mut buffer1 = [0; 256];
25+
for _ in 0..3 {
26+
assert_eq!(12, stream.read(&mut buffer1)?);
27+
println!("Server Received: {}", String::from_utf8_lossy(&buffer1));
28+
assert_eq!(256, stream.write(&buffer1)?);
29+
println!("Server Send");
30+
}
31+
let mut buffer2 = [0; 256];
32+
for _ in 0..3 {
33+
let mut buffers = [IoSliceMut::new(&mut buffer1), IoSliceMut::new(&mut buffer2)];
34+
assert_eq!(26, stream.read_vectored(&mut buffers)?);
35+
println!(
36+
"Server Received Multiple: {}{}",
37+
String::from_utf8_lossy(&buffer1),
38+
String::from_utf8_lossy(&buffer2)
39+
);
40+
let responses = [IoSlice::new(&buffer1), IoSlice::new(&buffer2)];
41+
assert_eq!(512, stream.write_vectored(&responses)?);
42+
println!("Server Send Multiple");
43+
}
44+
println!("Server Shutdown Write");
45+
stream.shutdown(Shutdown::Write).map(|()| {
46+
println!("Server Closed Connection");
47+
})?;
48+
let (lock, cvar) = &*server_finished;
49+
let mut pending = lock.lock().unwrap();
50+
*pending = false;
51+
cvar.notify_one();
52+
}
53+
Ok(())
54+
}
55+
56+
pub fn start_client<A: ToSocketAddrs>(addr: A) -> std::io::Result<()> {
57+
let mut stream = connect_timeout(addr, Duration::from_secs(3))?;
58+
let mut buffer1 = [0; 256];
59+
for i in 0..3 {
60+
assert_eq!(12, stream.write(format!("RequestPart{i}").as_ref())?);
61+
println!("Client Send");
62+
assert_eq!(256, stream.read(&mut buffer1)?);
63+
println!("Client Received: {}", String::from_utf8_lossy(&buffer1));
64+
}
65+
let mut buffer2 = [0; 256];
66+
for i in 0..3 {
67+
let request1 = format!("RequestPart{i}1");
68+
let request2 = format!("RequestPart{i}2");
69+
let requests = [
70+
IoSlice::new(request1.as_ref()),
71+
IoSlice::new(request2.as_ref()),
72+
];
73+
assert_eq!(26, stream.write_vectored(&requests)?);
74+
println!("Client Send Multiple");
75+
let mut buffers = [IoSliceMut::new(&mut buffer1), IoSliceMut::new(&mut buffer2)];
76+
assert_eq!(512, stream.read_vectored(&mut buffers)?);
77+
println!(
78+
"Client Received Multiple: {}{}",
79+
String::from_utf8_lossy(&buffer1),
80+
String::from_utf8_lossy(&buffer2)
81+
);
82+
}
83+
println!("Client Shutdown Write");
84+
stream.shutdown(Shutdown::Write).map(|()| {
85+
println!("Client Closed");
86+
})
87+
}
88+
89+
fn connect_timeout<A: ToSocketAddrs>(addr: A, timeout: Duration) -> std::io::Result<TcpStream> {
90+
let mut last_err = None;
91+
for addr in addr.to_socket_addrs()? {
92+
match TcpStream::connect_timeout(&addr, timeout) {
93+
Ok(l) => return Ok(l),
94+
Err(e) => last_err = Some(e),
95+
}
96+
}
97+
Err(last_err.unwrap_or_else(|| {
98+
std::io::Error::new(
99+
ErrorKind::InvalidInput,
100+
"could not resolve to any addresses",
101+
)
102+
}))
103+
}
104+
17105
pub fn crate_server(
18106
port: u16,
19107
server_started: Arc<AtomicBool>,
20108
server_finished: Arc<(Mutex<bool>, Condvar)>,
21109
) {
22110
//invoke by libc::listen
23-
crate_co(1);
111+
crate_task(1);
24112
let mut data: [u8; 512] = unsafe { std::mem::zeroed() };
25113
data[511] = b'\n';
26114
let listener = TcpListener::bind(format!("127.0.0.1:{port}"))
27115
.unwrap_or_else(|_| panic!("bind to 127.0.0.1:{port} failed !"));
28116
server_started.store(true, Ordering::Release);
29117
//invoke by libc::accept
30-
crate_co(2);
118+
crate_task(2);
31119
if let Some(stream) = listener.incoming().next() {
32120
let mut stream = stream.expect("accept new connection failed !");
33121
let mut buffer: [u8; 512] = [0; 512];
34122
loop {
35123
//invoke by libc::recv
36-
crate_co(6);
124+
crate_task(6);
37125
//从流里面读内容,读到buffer中
38126
let bytes_read = stream.read(&mut buffer).expect("server read failed !");
39127
if bytes_read == 0 {
@@ -48,13 +136,13 @@ pub fn crate_server(
48136
*pending = false;
49137
cvar.notify_one();
50138
println!("server closed");
51-
crate_co(8);
139+
crate_task(8);
52140
return;
53141
}
54142
assert_eq!(512, bytes_read);
55143
assert_eq!(data, buffer);
56144
//invoke by libc::send
57-
crate_co(7);
145+
crate_task(7);
58146
//回写
59147
assert_eq!(
60148
bytes_read,
@@ -74,7 +162,7 @@ pub fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
74162
//等服务端起来
75163
while !server_started.load(Ordering::Acquire) {}
76164
//invoke by libc::connect
77-
crate_co(3);
165+
crate_task(3);
78166
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
79167
let mut stream = TcpStream::connect_timeout(&socket, Duration::from_secs(3))
80168
.unwrap_or_else(|_| panic!("connect to 127.0.0.1:{port} failed !"));
@@ -83,13 +171,13 @@ pub fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
83171
let mut buffer: Vec<u8> = Vec::with_capacity(512);
84172
for _ in 0..3 {
85173
//invoke by libc::send
86-
crate_co(4);
174+
crate_task(4);
87175
//写入stream流,如果写入失败,提示"写入失败"
88176
assert_eq!(512, stream.write(&data).expect("Failed to write!"));
89177
print!("Client Send: {}", String::from_utf8_lossy(&data[..]));
90178

91179
//invoke by libc::recv
92-
crate_co(5);
180+
crate_task(5);
93181
let mut reader = BufReader::new(&stream);
94182
//一直读到换行为止(b'\n'中的b表示字节),读到buffer里面
95183
assert_eq!(
@@ -105,7 +193,7 @@ pub fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
105193
//发送终止符
106194
assert_eq!(1, stream.write(&[b'e']).expect("Failed to write!"));
107195
println!("client closed");
108-
crate_co(8);
196+
crate_task(8);
109197
}
110198

111199
pub fn crate_co_server(
@@ -114,22 +202,22 @@ pub fn crate_co_server(
114202
server_finished: Arc<(Mutex<bool>, Condvar)>,
115203
) {
116204
//invoke by libc::listen
117-
crate_co(11);
205+
crate_task(11);
118206
let mut data: [u8; 512] = unsafe { std::mem::zeroed() };
119207
data[511] = b'\n';
120208
let listener = TcpListener::bind(format!("127.0.0.1:{port}"))
121209
.unwrap_or_else(|_| panic!("bind to 127.0.0.1:{port} failed !"));
122210
server_started.store(true, Ordering::Release);
123211
//invoke by libc::accept
124-
crate_co(12);
212+
crate_task(12);
125213
for stream in listener.incoming() {
126214
_ = task!(
127215
|_, input| {
128216
let mut stream = input.expect("accept new connection failed !");
129217
let mut buffer: [u8; 512] = [0; 512];
130218
loop {
131219
//invoke by libc::recv
132-
crate_co(16);
220+
crate_task(16);
133221
//从流里面读内容,读到buffer中
134222
let bytes_read = stream
135223
.read(&mut buffer)
@@ -149,13 +237,13 @@ pub fn crate_co_server(
149237
*pending = false;
150238
cvar.notify_one();
151239
println!("coroutine server closed");
152-
crate_co(18);
240+
crate_task(18);
153241
return Some(Box::leak(Box::new(stream)));
154242
}
155243
assert_eq!(512, bytes_read);
156244
assert_eq!(data, buffer);
157245
//invoke by libc::send
158-
crate_co(17);
246+
crate_task(17);
159247
//回写
160248
assert_eq!(
161249
bytes_read,
@@ -180,7 +268,7 @@ pub fn crate_co_client(port: u16, server_started: Arc<AtomicBool>) {
180268
_ = task!(
181269
|_, input| {
182270
//invoke by libc::connect
183-
crate_co(13);
271+
crate_task(13);
184272
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), input);
185273
let mut stream = TcpStream::connect_timeout(&socket, Duration::from_secs(3))
186274
.unwrap_or_else(|_| panic!("connect to 127.0.0.1:{input} failed !"));
@@ -189,7 +277,7 @@ pub fn crate_co_client(port: u16, server_started: Arc<AtomicBool>) {
189277
let mut buffer: Vec<u8> = Vec::with_capacity(512);
190278
for _ in 0..3 {
191279
//invoke by libc::send
192-
crate_co(14);
280+
crate_task(14);
193281
//写入stream流,如果写入失败,提示"写入失败"
194282
assert_eq!(512, stream.write(&data).expect("Failed to write!"));
195283
print!(
@@ -198,7 +286,7 @@ pub fn crate_co_client(port: u16, server_started: Arc<AtomicBool>) {
198286
);
199287

200288
//invoke by libc::recv
201-
crate_co(15);
289+
crate_task(15);
202290
let mut reader = BufReader::new(&stream);
203291
//一直读到换行为止(b'\n'中的b表示字节),读到buffer里面
204292
assert_eq!(
@@ -217,7 +305,7 @@ pub fn crate_co_client(port: u16, server_started: Arc<AtomicBool>) {
217305
//发送终止符
218306
assert_eq!(1, stream.write(&[b'e']).expect("Failed to write!"));
219307
println!("coroutine client closed");
220-
crate_co(18);
308+
crate_task(18);
221309
},
222310
port,
223311
);

open-coroutine-core/src/monitor/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,7 @@ impl Monitor {
166166

167167
#[allow(dead_code)]
168168
pub fn stop() {
169-
assert_eq!(
170-
MonitorState::Running,
171-
Self::get_instance().state.replace(MonitorState::Stopping)
172-
);
169+
Self::get_instance().state.set(MonitorState::Stopping);
173170
cfg_if::cfg_if! {
174171
if #[cfg(feature = "net")] {
175172
let pair = EventLoops::new_condition();

open-coroutine-core/src/syscall/facade.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,6 @@ pub extern "C" fn recvmsg(
131131

132132
/// write
133133
134-
#[must_use]
135-
pub extern "C" fn send(
136-
fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, c_int) -> ssize_t>,
137-
socket: c_int,
138-
buf: *const c_void,
139-
len: size_t,
140-
flags: c_int,
141-
) -> ssize_t {
142-
CHAIN.send(fn_ptr, socket, buf, len, flags)
143-
}
144-
145134
#[must_use]
146135
pub extern "C" fn sendto(
147136
fn_ptr: Option<

open-coroutine-core/src/syscall/io_uring.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,6 @@ impl<I: UnixSyscall> UnixSyscall for IoUringLinuxSyscall<I> {
136136
impl_io_uring!(self, recvmsg, fn_ptr, fd, msg, flags)
137137
}
138138

139-
extern "C" fn send(
140-
&self,
141-
fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, c_int) -> ssize_t>,
142-
socket: c_int,
143-
buf: *const c_void,
144-
len: size_t,
145-
flags: c_int,
146-
) -> ssize_t {
147-
impl_io_uring!(self, send, fn_ptr, socket, buf, len, flags)
148-
}
149-
150139
extern "C" fn sendto(
151140
&self,
152141
fn_ptr: Option<

open-coroutine-core/src/syscall/mod.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,6 @@ pub trait UnixSyscall {
127127

128128
/// write
129129
130-
extern "C" fn send(
131-
&self,
132-
fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, c_int) -> ssize_t>,
133-
fd: c_int,
134-
buf: *const c_void,
135-
len: size_t,
136-
flags: c_int,
137-
) -> ssize_t;
138-
139130
extern "C" fn sendto(
140131
&self,
141132
fn_ptr: Option<

open-coroutine-core/src/syscall/nio.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -527,17 +527,6 @@ impl<I: UnixSyscall> UnixSyscall for NioLinuxSyscall<I> {
527527
r
528528
}
529529

530-
extern "C" fn send(
531-
&self,
532-
fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, c_int) -> ssize_t>,
533-
socket: c_int,
534-
buf: *const c_void,
535-
len: size_t,
536-
flags: c_int,
537-
) -> ssize_t {
538-
impl_expected_write_hook!(self.inner, send, fn_ptr, socket, buf, len, flags)
539-
}
540-
541530
extern "C" fn sendto(
542531
&self,
543532
fn_ptr: Option<

open-coroutine-core/src/syscall/raw.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -163,21 +163,6 @@ impl UnixSyscall for RawLinuxSyscall {
163163

164164
/// write
165165
166-
extern "C" fn send(
167-
&self,
168-
fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, c_int) -> ssize_t>,
169-
socket: c_int,
170-
buf: *const c_void,
171-
len: size_t,
172-
flags: c_int,
173-
) -> ssize_t {
174-
if let Some(f) = fn_ptr {
175-
(f)(socket, buf, len, flags)
176-
} else {
177-
unsafe { libc::send(socket, buf, len, flags) }
178-
}
179-
}
180-
181166
extern "C" fn sendto(
182167
&self,
183168
fn_ptr: Option<

open-coroutine-core/src/syscall/state.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -146,17 +146,6 @@ impl<I: UnixSyscall> UnixSyscall for StateLinuxSyscall<I> {
146146
syscall_state!(self, recvmsg, fn_ptr, fd, msg, flags)
147147
}
148148

149-
extern "C" fn send(
150-
&self,
151-
fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, c_int) -> ssize_t>,
152-
socket: c_int,
153-
buf: *const c_void,
154-
len: size_t,
155-
flags: c_int,
156-
) -> ssize_t {
157-
syscall_state!(self, send, fn_ptr, socket, buf, len, flags)
158-
}
159-
160149
extern "C" fn sendto(
161150
&self,
162151
fn_ptr: Option<

0 commit comments

Comments
 (0)