Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/backend/io_uring.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1182,9 +1182,12 @@ test "io_uring: timerfd" {
var called = false;
var c: Completion = .{
.op = .{
.read = .{
// Note: we should be able to use `read` here but on
// Kernel 6.15.4 there is a bug that prevents the read
// from ever firing with io_uring. I don't know why. I changed
// this to a poll so tests pass, which should also be fine!
.poll = .{
.fd = t.fd,
.buffer = .{ .array = undefined },
},
},

Expand Down
79 changes: 74 additions & 5 deletions src/watcher/async.zig
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ fn AsyncEventFd(comptime xev: type) type {
// TODO: error handling
.freebsd => eventfd(
0,
0x100000, // EFD_CLOEXEC
0x100000 | 0x4, // EFD_CLOEXEC | EFD_NONBLOCK
),

// Use std.posix if we can.
else => try std.posix.eventfd(
0,
std.os.linux.EFD.CLOEXEC,
std.os.linux.EFD.CLOEXEC |
std.os.linux.EFD.NONBLOCK,
),
},
};
Expand All @@ -81,7 +82,13 @@ fn AsyncEventFd(comptime xev: type) type {
/// You should NOT register an async with multiple loops (the same loop
/// is fine -- but unnecessary). The behavior when waiting on multiple
/// loops is undefined.
pub fn wait(
pub const wait = switch (xev.backend) {
.io_uring, .epoll => waitPoll,
.kqueue => waitRead,
.iocp, .wasi_poll => @compileError("AsyncEventFd does not support wait for this backend"),
};

fn waitRead(
self: Self,
loop: *xev.Loop,
c: *xev.Completion,
Expand Down Expand Up @@ -122,6 +129,63 @@ fn AsyncEventFd(comptime xev: type) type {
loop.add(c);
}

fn waitPoll(
self: Self,
loop: *xev.Loop,
c: *xev.Completion,
comptime Userdata: type,
userdata: ?*Userdata,
comptime cb: *const fn (
ud: ?*Userdata,
l: *xev.Loop,
c: *xev.Completion,
r: WaitError!void,
) xev.CallbackAction,
) void {
c.* = .{
.op = .{
// We use a poll operation instead of a read operation
// because in Kernel 6.15.4, read was regressed for
// io_uring on eventfd/timerfd and would block forever.
// However, poll works fine.
.poll = .{
.fd = self.fd,
.events = posix.POLL.IN,
},
},

.userdata = userdata,
.callback = (struct {
fn callback(
ud: ?*anyopaque,
l_inner: *xev.Loop,
c_inner: *xev.Completion,
r: xev.Result,
) xev.CallbackAction {
if (r.poll) |_| {
// We need to read so that we can consume the
// eventfd value. We only read 8 bytes because
// we only write up to 8 bytes and we own the fd.
// We ignore errors here because we expect the
// read to succeed given we just polled it.
var buf: [8]u8 = undefined;
_ = posix.read(c_inner.op.poll.fd, &buf) catch {};
} else |_| {
// We'll call the callback with the error later.
}

return @call(.always_inline, cb, .{
common.userdataValue(Userdata, ud),
l_inner,
c_inner,
if (r.poll) |_| {} else |err| err,
});
}
}).callback,
};
loop.add(c);
}

/// Notify a loop to wake up synchronously. This should never block forever
/// (it will always EVENTUALLY succeed regardless of if the loop is currently
/// ticking or not).
Expand Down Expand Up @@ -670,16 +734,21 @@ fn AsyncTests(comptime xev: type, comptime Impl: type) type {
) xev.CallbackAction {
_ = r catch unreachable;
ud.?.* = true;
return .disarm;
return .rearm;
}
}).callback);

// Send a notification
try notifier.notify();

// Wait for wake
try loop.run(.until_done);
try loop.run(.once);
try testing.expect(wake);

// Make sure it only triggers once
wake = false;
try loop.run(.no_wait);
try testing.expect(!wake);
}

test "async: notify first" {
Expand Down
Loading