Skip to content

Commit 8e9d097

Browse files
authored
feat(driver): add AIO support for illumos & solaris (#330)
* feat(driver): AIO for illumos & solaris * fix(poll): simplify push_blocking loop
1 parent cfcc689 commit 8e9d097

File tree

5 files changed

+58
-39
lines changed

5 files changed

+58
-39
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ crossbeam-channel = "0.5.8"
4848
crossbeam-queue = "0.3.8"
4949
futures-channel = "0.3.29"
5050
futures-util = "0.3.29"
51-
libc = "0.2.149"
51+
libc = "0.2.164"
5252
nix = "0.29.0"
5353
once_cell = "1.18.0"
5454
os_pipe = "1.1.4"

compio-driver/build.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ fn main() {
1212
) },
1313
gnulinux: { all(target_os = "linux", target_env = "gnu") },
1414
freebsd: { target_os = "freebsd" },
15-
aio: { freebsd },
15+
solarish: { any(target_os = "illumos", target_os = "solaris") },
16+
aio: { any(freebsd, solarish) },
1617
}
1718
}

compio-driver/src/poll/mod.rs

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -244,57 +244,72 @@ impl Driver {
244244
Poll::Pending
245245
}
246246
Decision::Completed(res) => Poll::Ready(Ok(res)),
247-
Decision::Blocking => loop {
248-
if self.push_blocking(user_data) {
249-
break Poll::Pending;
250-
} else {
251-
self.poll_blocking();
252-
}
253-
},
247+
Decision::Blocking => self.push_blocking(user_data),
254248
#[cfg(aio)]
255249
Decision::Aio(AioControl { mut aiocbp, submit }) => {
256250
let aiocb = unsafe { aiocbp.as_mut() };
257-
// sigev_notify_kqueue
258-
aiocb.aio_sigevent.sigev_signo = self.poll.as_raw_fd();
259-
aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
260-
aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
251+
#[cfg(freebsd)]
252+
{
253+
// sigev_notify_kqueue
254+
aiocb.aio_sigevent.sigev_signo = self.poll.as_raw_fd();
255+
aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
256+
aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
257+
}
258+
#[cfg(solarish)]
259+
let mut notify = libc::port_notify {
260+
portnfy_port: self.poll.as_raw_fd(),
261+
portnfy_user: user_data as _,
262+
};
263+
#[cfg(solarish)]
264+
{
265+
aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
266+
aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
267+
}
261268
match syscall!(submit(aiocbp.as_ptr())) {
262269
Ok(_) => Poll::Pending,
270+
// FreeBSD:
271+
// * EOPNOTSUPP: It's on a filesystem without AIO support. Just fallback to
272+
// blocking IO.
273+
// * EAGAIN: The process-wide queue is full. No safe way to remove the (maybe)
274+
// dead entries.
275+
// Solarish:
276+
// * EAGAIN: Allocation failed.
263277
Err(e)
264278
if matches!(
265279
e.raw_os_error(),
266280
Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
267281
) =>
268282
{
269-
loop {
270-
if self.push_blocking(user_data) {
271-
return Poll::Pending;
272-
} else {
273-
self.poll_blocking();
274-
}
275-
}
283+
self.push_blocking(user_data)
276284
}
277285
Err(e) => Poll::Ready(Err(e)),
278286
}
279287
}
280288
}
281289
}
282290

283-
fn push_blocking(&mut self, user_data: usize) -> bool {
291+
fn push_blocking(&mut self, user_data: usize) -> Poll<io::Result<usize>> {
284292
let poll = self.poll.clone();
285293
let completed = self.pool_completed.clone();
286-
self.pool
287-
.dispatch(move || {
288-
let mut op = unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
289-
let op_pin = op.as_op_pin();
290-
let res = match op_pin.operate() {
291-
Poll::Pending => unreachable!("this operation is not non-blocking"),
292-
Poll::Ready(res) => res,
293-
};
294-
completed.push(Entry::new(user_data, res));
295-
poll.notify().ok();
296-
})
297-
.is_ok()
294+
let mut closure = move || {
295+
let mut op = unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
296+
let op_pin = op.as_op_pin();
297+
let res = match op_pin.operate() {
298+
Poll::Pending => unreachable!("this operation is not non-blocking"),
299+
Poll::Ready(res) => res,
300+
};
301+
completed.push(Entry::new(user_data, res));
302+
poll.notify().ok();
303+
};
304+
loop {
305+
match self.pool.dispatch(closure) {
306+
Ok(()) => return Poll::Pending,
307+
Err(e) => {
308+
closure = e.0;
309+
self.poll_blocking();
310+
}
311+
}
312+
}
298313
}
299314

300315
fn poll_blocking(&mut self) {

compio-fs/build.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ fn main() {
66
target_os = "freebsd",
77
target_os = "openbsd",
88
target_vendor = "apple"
9-
) }
9+
) },
10+
solarish: { any(target_os = "illumos", target_os = "solaris") },
1011
}
1112
}

compio-fs/src/file.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
use std::{future::Future, io, mem::ManuallyDrop, panic::resume_unwind, path::Path};
22

33
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
4+
#[cfg(unix)]
5+
use compio_driver::op::FileStat;
46
use compio_driver::{
57
ToSharedFd, impl_raw_fd,
68
op::{BufResultExt, CloseFile, ReadAt, Sync, WriteAt},
79
};
810
use compio_io::{AsyncReadAt, AsyncWriteAt};
911
use compio_runtime::Attacher;
10-
#[cfg(unix)]
12+
#[cfg(all(unix, not(solarish)))]
1113
use {
1214
compio_buf::{IoVectoredBuf, IoVectoredBufMut},
13-
compio_driver::op::{FileStat, ReadVectoredAt, WriteVectoredAt},
15+
compio_driver::op::{ReadVectoredAt, WriteVectoredAt},
1416
};
1517

1618
use crate::{Metadata, OpenOptions, Permissions};
@@ -155,7 +157,7 @@ impl AsyncReadAt for File {
155157
compio_runtime::submit(op).await.into_inner().map_advanced()
156158
}
157159

158-
#[cfg(unix)]
160+
#[cfg(all(unix, not(solarish)))]
159161
async fn read_vectored_at<T: IoVectoredBufMut>(
160162
&self,
161163
buffer: T,
@@ -173,7 +175,7 @@ impl AsyncWriteAt for File {
173175
(&*self).write_at(buf, pos).await
174176
}
175177

176-
#[cfg(unix)]
178+
#[cfg(all(unix, not(solarish)))]
177179
#[inline]
178180
async fn write_vectored_at<T: IoVectoredBuf>(
179181
&mut self,
@@ -191,7 +193,7 @@ impl AsyncWriteAt for &File {
191193
compio_runtime::submit(op).await.into_inner()
192194
}
193195

194-
#[cfg(unix)]
196+
#[cfg(all(unix, not(solarish)))]
195197
async fn write_vectored_at<T: IoVectoredBuf>(
196198
&mut self,
197199
buffer: T,

0 commit comments

Comments
 (0)