Skip to content

Commit 11d96b7

Browse files
committed
test send IOCP
1 parent aaaf15e commit 11d96b7

File tree

2 files changed

+177
-0
lines changed

2 files changed

+177
-0
lines changed

core/src/net/operator/windows.rs renamed to core/src/net/operator/windows/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ use windows_sys::Win32::System::IO::{
1919
CreateIoCompletionPort, GetQueuedCompletionStatusEx, OVERLAPPED, OVERLAPPED_ENTRY,
2020
};
2121

22+
#[cfg(test)]
23+
mod tests;
24+
2225
#[repr(C)]
2326
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
2427
pub(crate) struct SocketContext {
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
use crate::net::operator::Operator;
2+
use slab::Slab;
3+
use std::io::{BufRead, BufReader, Write};
4+
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
5+
use std::os::windows::io::AsRawSocket;
6+
use std::sync::atomic::{AtomicBool, Ordering};
7+
use std::sync::Arc;
8+
use std::time::Duration;
9+
use windows_sys::Win32::Networking::WinSock::{closesocket, SOCKET};
10+
11+
#[derive(Clone, Debug)]
12+
enum Token {
13+
Accept,
14+
Read {
15+
fd: SOCKET,
16+
buf_index: usize,
17+
},
18+
Write {
19+
fd: SOCKET,
20+
buf_index: usize,
21+
offset: usize,
22+
len: usize,
23+
},
24+
}
25+
26+
fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
27+
//等服务端起来
28+
while !server_started.load(Ordering::Acquire) {}
29+
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
30+
let mut stream = TcpStream::connect_timeout(&socket, Duration::from_secs(3))
31+
.unwrap_or_else(|_| panic!("connect to 127.0.0.1:{port} failed !"));
32+
let mut data: [u8; 512] = [b'1'; 512];
33+
data[511] = b'\n';
34+
let mut buffer: Vec<u8> = Vec::with_capacity(512);
35+
for _ in 0..3 {
36+
//写入stream流,如果写入失败,提示"写入失败"
37+
assert_eq!(512, stream.write(&data).expect("Failed to write!"));
38+
print!("Client Send: {}", String::from_utf8_lossy(&data[..]));
39+
40+
let mut reader = BufReader::new(&stream);
41+
//一直读到换行为止(b'\n'中的b表示字节),读到buffer里面
42+
assert_eq!(
43+
512,
44+
reader
45+
.read_until(b'\n', &mut buffer)
46+
.expect("Failed to read into buffer")
47+
);
48+
print!("Client Received: {}", String::from_utf8_lossy(&buffer[..]));
49+
assert_eq!(&data, &buffer as &[u8]);
50+
buffer.clear();
51+
}
52+
//发送终止符
53+
assert_eq!(1, stream.write(&[b'e']).expect("Failed to write!"));
54+
println!("client closed");
55+
}
56+
57+
fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()> {
58+
let operator = Operator::new(0)?;
59+
let listener = TcpListener::bind(("127.0.0.1", port))?;
60+
61+
let mut bufpool = Vec::with_capacity(64);
62+
let mut buf_alloc = Slab::with_capacity(64);
63+
let mut token_alloc = Slab::with_capacity(64);
64+
65+
println!("listen {}", listener.local_addr()?);
66+
server_started.store(true, Ordering::Release);
67+
68+
operator.accept(
69+
token_alloc.insert(Token::Accept),
70+
listener.as_raw_socket() as _,
71+
std::ptr::null_mut(),
72+
std::ptr::null_mut(),
73+
)?;
74+
75+
loop {
76+
let (_, mut cq, _) = operator.select(None, 1)?;
77+
for cqe in &mut cq {
78+
let token_index = cqe.token;
79+
let token = &mut token_alloc[token_index];
80+
match token.clone() {
81+
Token::Accept => {
82+
println!("accept");
83+
let fd = cqe.socket;
84+
let (buf_index, buf) = match bufpool.pop() {
85+
Some(buf_index) => (buf_index, &mut buf_alloc[buf_index]),
86+
None => {
87+
let buf = vec![0u8; 2048].into_boxed_slice();
88+
let buf_entry = buf_alloc.vacant_entry();
89+
let buf_index = buf_entry.key();
90+
(buf_index, buf_entry.insert(buf))
91+
}
92+
};
93+
*token = Token::Read { fd, buf_index };
94+
operator.recv(token_index, fd, buf.as_mut_ptr() as _, buf.len() as _, 0)?;
95+
}
96+
Token::Read { fd, buf_index } => {
97+
let ret = cqe.bytes_transferred as _;
98+
if ret == 0 {
99+
bufpool.push(buf_index);
100+
_ = token_alloc.remove(token_index);
101+
println!("shutdown connection");
102+
_ = unsafe { closesocket(fd) };
103+
println!("Server closed");
104+
return Ok(());
105+
} else {
106+
let len = ret;
107+
let buf = &buf_alloc[buf_index];
108+
*token = Token::Write {
109+
fd,
110+
buf_index,
111+
len,
112+
offset: 0,
113+
};
114+
operator.send(token_index, fd, buf.as_ptr() as _, len as _, 0)?;
115+
}
116+
}
117+
Token::Write {
118+
fd,
119+
buf_index,
120+
offset,
121+
len,
122+
} => {
123+
let write_len = cqe.bytes_transferred as usize;
124+
if offset + write_len >= len {
125+
bufpool.push(buf_index);
126+
let (buf_index, buf) = match bufpool.pop() {
127+
Some(buf_index) => (buf_index, &mut buf_alloc[buf_index]),
128+
None => {
129+
let buf = vec![0u8; 2048].into_boxed_slice();
130+
let buf_entry = buf_alloc.vacant_entry();
131+
let buf_index = buf_entry.key();
132+
(buf_index, buf_entry.insert(buf))
133+
}
134+
};
135+
*token = Token::Read { fd, buf_index };
136+
operator.recv(token_index, fd, buf.as_mut_ptr() as _, buf.len() as _, 0)?;
137+
} else {
138+
let offset = offset + write_len;
139+
let len = len - offset;
140+
let buf = &buf_alloc[buf_index][offset..];
141+
*token = Token::Write {
142+
fd,
143+
buf_index,
144+
offset,
145+
len,
146+
};
147+
operator.write(token_index, fd as _, buf.as_ptr() as _, len as _)?;
148+
};
149+
}
150+
}
151+
}
152+
}
153+
}
154+
155+
#[test]
156+
fn framework() -> anyhow::Result<()> {
157+
#[cfg(feature = "log")]
158+
let _ = tracing_subscriber::fmt()
159+
.with_thread_names(true)
160+
.with_line_number(true)
161+
.with_timer(tracing_subscriber::fmt::time::OffsetTime::new(
162+
time::UtcOffset::from_hms(8, 0, 0).expect("create UtcOffset failed !"),
163+
time::format_description::well_known::Rfc2822,
164+
))
165+
.try_init();
166+
let port = 7061;
167+
let server_started = Arc::new(AtomicBool::new(false));
168+
let clone = server_started.clone();
169+
let handle = std::thread::spawn(move || crate_server2(port, clone));
170+
std::thread::spawn(move || crate_client(port, server_started))
171+
.join()
172+
.expect("client has error");
173+
handle.join().expect("server has error")
174+
}

0 commit comments

Comments
 (0)