Skip to content

Commit 368b6b5

Browse files
committed
fix(driver,iocp): remove cancelled set because improssible to cancel before push
1 parent b8eb431 commit 368b6b5

File tree

1 file changed

+24
-39
lines changed

1 file changed

+24
-39
lines changed

compio-driver/src/iocp/mod.rs

Lines changed: 24 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::{HashMap, HashSet},
2+
collections::HashMap,
33
io,
44
os::{
55
raw::c_void,
@@ -17,7 +17,7 @@ use std::{
1717

1818
use compio_log::{instrument, trace};
1919
use windows_sys::Win32::{
20-
Foundation::{ERROR_BUSY, ERROR_OPERATION_ABORTED, ERROR_TIMEOUT, WAIT_OBJECT_0, WAIT_TIMEOUT},
20+
Foundation::{ERROR_BUSY, ERROR_TIMEOUT, WAIT_OBJECT_0, WAIT_TIMEOUT},
2121
Networking::WinSock::{WSACleanup, WSAStartup, WSADATA},
2222
System::{
2323
Threading::{
@@ -170,7 +170,6 @@ pub trait OpCode {
170170
pub(crate) struct Driver {
171171
port: cp::Port,
172172
waits: HashMap<usize, WinThreadpollWait>,
173-
cancelled: HashSet<usize>,
174173
pool: AsyncifyPool,
175174
notify_overlapped: Arc<Overlapped>,
176175
}
@@ -186,7 +185,6 @@ impl Driver {
186185
Ok(Self {
187186
port,
188187
waits: HashMap::default(),
189-
cancelled: HashSet::default(),
190188
pool: builder.create_or_get_thread_pool(),
191189
notify_overlapped: Arc::new(Overlapped::new(driver)),
192190
})
@@ -203,8 +201,6 @@ impl Driver {
203201
pub fn cancel<T: OpCode>(&mut self, mut op: Key<T>) {
204202
instrument!(compio_log::Level::TRACE, "cancel", ?op);
205203
trace!("cancel RawOp");
206-
let user_data = op.user_data();
207-
self.cancelled.insert(user_data);
208204
let overlapped_ptr = op.as_mut_ptr();
209205
let op = op.as_op_pin();
210206
// It's OK to fail to cancel.
@@ -215,32 +211,25 @@ impl Driver {
215211
pub fn push<T: OpCode + 'static>(&mut self, op: &mut Key<T>) -> Poll<io::Result<usize>> {
216212
instrument!(compio_log::Level::TRACE, "push", ?op);
217213
let user_data = op.user_data();
218-
if self.cancelled.remove(&user_data) {
219-
trace!("pushed RawOp already cancelled");
220-
Poll::Ready(Err(io::Error::from_raw_os_error(
221-
ERROR_OPERATION_ABORTED as _,
222-
)))
223-
} else {
224-
trace!("push RawOp");
225-
let optr = op.as_mut_ptr();
226-
let op_pin = op.as_op_pin();
227-
match op_pin.op_type() {
228-
OpType::Overlapped => unsafe { op_pin.operate(optr.cast()) },
229-
OpType::Blocking => {
230-
if self.push_blocking(user_data)? {
231-
Poll::Pending
232-
} else {
233-
Poll::Ready(Err(io::Error::from_raw_os_error(ERROR_BUSY as _)))
234-
}
235-
}
236-
OpType::Event(e) => {
237-
self.waits.insert(
238-
user_data,
239-
WinThreadpollWait::new(self.port.handle(), e, op)?,
240-
);
214+
trace!("push RawOp");
215+
let optr = op.as_mut_ptr();
216+
let op_pin = op.as_op_pin();
217+
match op_pin.op_type() {
218+
OpType::Overlapped => unsafe { op_pin.operate(optr.cast()) },
219+
OpType::Blocking => {
220+
if self.push_blocking(user_data)? {
241221
Poll::Pending
222+
} else {
223+
Poll::Ready(Err(io::Error::from_raw_os_error(ERROR_BUSY as _)))
242224
}
243225
}
226+
OpType::Event(e) => {
227+
self.waits.insert(
228+
user_data,
229+
WinThreadpollWait::new(self.port.handle(), e, op)?,
230+
);
231+
Poll::Pending
232+
}
244233
}
245234
}
246235

@@ -259,19 +248,13 @@ impl Driver {
259248

260249
fn create_entry(
261250
notify_user_data: usize,
262-
cancelled: &mut HashSet<usize>,
263251
waits: &mut HashMap<usize, WinThreadpollWait>,
264252
entry: Entry,
265253
) -> Option<Entry> {
266254
let user_data = entry.user_data();
267255
if user_data != notify_user_data {
268256
waits.remove(&user_data);
269-
let result = if cancelled.remove(&user_data) {
270-
Err(io::Error::from_raw_os_error(ERROR_OPERATION_ABORTED as _))
271-
} else {
272-
entry.into_result()
273-
};
274-
Some(Entry::new(user_data, result))
257+
Some(Entry::new(user_data, entry.into_result()))
275258
} else {
276259
None
277260
}
@@ -286,9 +269,11 @@ impl Driver {
286269

287270
let notify_user_data = self.notify_overlapped.as_ref() as *const Overlapped as usize;
288271

289-
entries.extend(self.port.poll(timeout)?.filter_map(|e| {
290-
Self::create_entry(notify_user_data, &mut self.cancelled, &mut self.waits, e)
291-
}));
272+
entries.extend(
273+
self.port
274+
.poll(timeout)?
275+
.filter_map(|e| Self::create_entry(notify_user_data, &mut self.waits, e)),
276+
);
292277

293278
Ok(())
294279
}

0 commit comments

Comments
 (0)