diff --git a/src/backend/io_uring.zig b/src/backend/io_uring.zig index deba023..17c0b9c 100644 --- a/src/backend/io_uring.zig +++ b/src/backend/io_uring.zig @@ -1182,9 +1182,12 @@ test "io_uring: timerfd" { var called = false; var c: Completion = .{ .op = .{ - .read = .{ + // Note: we should be able to use `read` here but on + // Kernel 6.15.4 there is a bug that prevents the read + // from ever firing with io_uring. I don't know why. I changed + // this to a poll so tests pass, which should also be fine! + .poll = .{ .fd = t.fd, - .buffer = .{ .array = undefined }, }, }, diff --git a/src/watcher/async.zig b/src/watcher/async.zig index f6e7e37..465a84f 100644 --- a/src/watcher/async.zig +++ b/src/watcher/async.zig @@ -53,13 +53,14 @@ fn AsyncEventFd(comptime xev: type) type { // TODO: error handling .freebsd => eventfd( 0, - 0x100000, // EFD_CLOEXEC + 0x100000 | 0x4, // EFD_CLOEXEC | EFD_NONBLOCK ), // Use std.posix if we can. else => try std.posix.eventfd( 0, - std.os.linux.EFD.CLOEXEC, + std.os.linux.EFD.CLOEXEC | + std.os.linux.EFD.NONBLOCK, ), }, }; @@ -81,7 +82,13 @@ fn AsyncEventFd(comptime xev: type) type { /// You should NOT register an async with multiple loops (the same loop /// is fine -- but unnecessary). The behavior when waiting on multiple /// loops is undefined. - pub fn wait( + pub const wait = switch (xev.backend) { + .io_uring, .epoll => waitPoll, + .kqueue => waitRead, + .iocp, .wasi_poll => @compileError("AsyncEventFd does not support wait for this backend"), + }; + + fn waitRead( self: Self, loop: *xev.Loop, c: *xev.Completion, @@ -122,6 +129,63 @@ fn AsyncEventFd(comptime xev: type) type { loop.add(c); } + fn waitPoll( + self: Self, + loop: *xev.Loop, + c: *xev.Completion, + comptime Userdata: type, + userdata: ?*Userdata, + comptime cb: *const fn ( + ud: ?*Userdata, + l: *xev.Loop, + c: *xev.Completion, + r: WaitError!void, + ) xev.CallbackAction, + ) void { + c.* = .{ + .op = .{ + // We use a poll operation instead of a read operation + // because in Kernel 6.15.4, read was regressed for + // io_uring on eventfd/timerfd and would block forever. + // However, poll works fine. + .poll = .{ + .fd = self.fd, + .events = posix.POLL.IN, + }, + }, + + .userdata = userdata, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l_inner: *xev.Loop, + c_inner: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + if (r.poll) |_| { + // We need to read so that we can consume the + // eventfd value. We only read 8 bytes because + // we only write up to 8 bytes and we own the fd. + // We ignore errors here because we expect the + // read to succeed given we just polled it. + var buf: [8]u8 = undefined; + _ = posix.read(c_inner.op.poll.fd, &buf) catch {}; + } else |_| { + // We'll call the callback with the error later. + } + + return @call(.always_inline, cb, .{ + common.userdataValue(Userdata, ud), + l_inner, + c_inner, + if (r.poll) |_| {} else |err| err, + }); + } + }).callback, + }; + loop.add(c); + } + /// Notify a loop to wake up synchronously. This should never block forever /// (it will always EVENTUALLY succeed regardless of if the loop is currently /// ticking or not). @@ -670,7 +734,7 @@ fn AsyncTests(comptime xev: type, comptime Impl: type) type { ) xev.CallbackAction { _ = r catch unreachable; ud.?.* = true; - return .disarm; + return .rearm; } }).callback); @@ -678,8 +742,13 @@ fn AsyncTests(comptime xev: type, comptime Impl: type) type { try notifier.notify(); // Wait for wake - try loop.run(.until_done); + try loop.run(.once); try testing.expect(wake); + + // Make sure it only triggers once + wake = false; + try loop.run(.no_wait); + try testing.expect(!wake); } test "async: notify first" {